originals-tz
11/28/2019 - 2:05 AM

libevent

#include "stdafx.h"
#include "..\include\IRPCAgentIMPL.h"

#include <event2/thread.h>
#include <event2/buffer.h>
#include <sstream>

#include <iostream>
#include <cassert>

namespace rpc
{
    IRPCAgentIMPL::IRPCAgentIMPL()
        : m_base(nullptr),
          m_socket(nullptr),
          m_listener(nullptr),
          m_handler(&DefaultHandler)
    {
    }

    IRPCAgentIMPL::~IRPCAgentIMPL()
    {
        Release();
    }

    void IRPCAgentIMPL::InitReceivePBHandle(HandleReceive pHandle)
    {
        m_handler = pHandle;
        if (!m_handler)
        {
            m_handler = &DefaultHandler;
			return;
        }
    }

    void IRPCAgentIMPL::Run(const uint16_t& port)
    {
        Release();
        Init(port);
        HANDLE handle = CreateThread(NULL, 0, EventLoop, m_base, 0, NULL);
        CloseHandle(handle);
    }

    void IRPCAgentIMPL::SendPBMsg(const char *pstr, const uint32_t& size)
    {
        if (m_socket)
        {
            bufferevent_write(m_socket, pstr, size);
        }
    }

    void IRPCAgentIMPL::Release()
    {
        //! 应释放完所有event再释放event_base
        ReleaseListener();
        ReleaseSocket();
        ReleaseBase();
        WSACleanup();
    }

    DWORD IRPCAgentIMPL::EventLoop(void* pparam)
    {
        event_base* base = static_cast<event_base*>(pparam);
        event_base_dispatch(base);
        return 0L;
    }

    void IRPCAgentIMPL::Init(const uint16_t& port)
    {
        WORD wVersionRequested;
        WSADATA wsaData;
        wVersionRequested = MAKEWORD(2, 2);
        WSAStartup(wVersionRequested, &wsaData);

        //! 开启多线程支持
        evthread_use_windows_threads();
        m_base = event_base_new();

        //! 初始化监听socket
        struct sockaddr_in sin;
        memset(&sin, 0, sizeof(struct sockaddr_in));
        sin.sin_family = AF_INET;
        sin.sin_port = htons(port);

        m_listener = evconnlistener_new_bind(m_base, ListenCallback, this, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE, 1, (struct sockaddr*)&sin, sizeof(struct sockaddr_in));
    }

    void IRPCAgentIMPL::ListenCallback(evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *arg)
    {
        IRPCAgentIMPL *obj = (IRPCAgentIMPL*)arg;
        obj->Accept(fd);
    }

    void IRPCAgentIMPL::Accept(const evutil_socket_t& fd)
    {
        //! 如果链接已建立,则拒绝建立新的连接
        if (m_socket)
        {
            evutil_closesocket(fd);
            return;
        }

        //! 初始化新连接
        m_socket = bufferevent_socket_new(m_base,fd,BEV_OPT_CLOSE_ON_FREE);
        bufferevent_setcb(m_socket, ReadCallback, nullptr, ErrorCallback, this);
        bufferevent_enable(m_socket, EV_READ | EV_PERSIST);
    }

    void IRPCAgentIMPL::ReadData()
    {
        std::stringstream ss;
        uint32_t size = 0;

        struct evbuffer* input = bufferevent_get_input(m_socket);
        //! 循环读取数据
        while (evbuffer_get_length(input))
        {
            char buffer[1024];
            int n = evbuffer_remove(input, buffer, sizeof(buffer));
            if (n > 0)
            {
                size += n;
                ss << buffer;
            }
        }

        //! 调用回调函数
        std::string data = ss.str();
        m_handler(data.c_str(), size);
    }

    void IRPCAgentIMPL::ReleaseBase()
    {
        if (m_base)
        {
            event_base_loopbreak(m_base);
            event_base_free(m_base);
            m_base = nullptr;
        }
    }

    void IRPCAgentIMPL::ReleaseSocket()
    {
        if (m_socket)
        {
            bufferevent_free(m_socket);
            m_socket = nullptr;
        }
    }

    void IRPCAgentIMPL::ReleaseListener()
    {
        if (m_listener)
        {
            evconnlistener_free(m_listener);
            m_listener = nullptr;
        }
    }

    void IRPCAgentIMPL::ReadCallback(bufferevent *bev, void* ctx)
    {
        IRPCAgentIMPL* obj = static_cast<IRPCAgentIMPL*>(ctx);
        obj->ReadData();
    }

    void IRPCAgentIMPL::ErrorCallback(struct bufferevent *bev, short events, void* ctx)
    {
        IRPCAgentIMPL* obj = static_cast<IRPCAgentIMPL*>(ctx);
        obj->ReleaseSocket();
    }

    void IRPCAgentIMPL::DefaultHandler(const char* pstr, const uint32_t& size)
    {
        // 发生这个问题, 目前想到有几种可能性
        // 1. 由于初始化异步问题, 在接到数据之前都没有设置handle,这将会导致数据丢失
        // 2. 由于释放资源的异步问题, 在关闭socket之前就关闭handle回调,这将会导致数据丢失
        // 3. 如果你本身就不需要数据回调,建议不要调用init,直到有需要的时候才调用
        assert(0 && "Agent : 没有设置Handler");
    }