#include "stdafx.h"
#include "..\include\IRPCAgentIMPL.h"
#include <event2/thread.h>
#include <event2/buffer.h>
#include <sstream>
#include <iostream>
#include <cassert>
namespace rpc
{
IRPCAgentIMPL::IRPCAgentIMPL()
: m_base(nullptr),
m_socket(nullptr),
m_listener(nullptr),
m_handler(&DefaultHandler)
{
}
IRPCAgentIMPL::~IRPCAgentIMPL()
{
Release();
}
void IRPCAgentIMPL::InitReceivePBHandle(HandleReceive pHandle)
{
m_handler = pHandle;
if (!m_handler)
{
m_handler = &DefaultHandler;
return;
}
}
void IRPCAgentIMPL::Run(const uint16_t& port)
{
Release();
Init(port);
HANDLE handle = CreateThread(NULL, 0, EventLoop, m_base, 0, NULL);
CloseHandle(handle);
}
void IRPCAgentIMPL::SendPBMsg(const char *pstr, const uint32_t& size)
{
if (m_socket)
{
bufferevent_write(m_socket, pstr, size);
}
}
void IRPCAgentIMPL::Release()
{
//! 应释放完所有event再释放event_base
ReleaseListener();
ReleaseSocket();
ReleaseBase();
WSACleanup();
}
DWORD IRPCAgentIMPL::EventLoop(void* pparam)
{
event_base* base = static_cast<event_base*>(pparam);
event_base_dispatch(base);
return 0L;
}
void IRPCAgentIMPL::Init(const uint16_t& port)
{
WORD wVersionRequested;
WSADATA wsaData;
wVersionRequested = MAKEWORD(2, 2);
WSAStartup(wVersionRequested, &wsaData);
//! 开启多线程支持
evthread_use_windows_threads();
m_base = event_base_new();
//! 初始化监听socket
struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
m_listener = evconnlistener_new_bind(m_base, ListenCallback, this, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE | LEV_OPT_THREADSAFE, 1, (struct sockaddr*)&sin, sizeof(struct sockaddr_in));
}
void IRPCAgentIMPL::ListenCallback(evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sock, int socklen, void *arg)
{
IRPCAgentIMPL *obj = (IRPCAgentIMPL*)arg;
obj->Accept(fd);
}
void IRPCAgentIMPL::Accept(const evutil_socket_t& fd)
{
//! 如果链接已建立,则拒绝建立新的连接
if (m_socket)
{
evutil_closesocket(fd);
return;
}
//! 初始化新连接
m_socket = bufferevent_socket_new(m_base,fd,BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(m_socket, ReadCallback, nullptr, ErrorCallback, this);
bufferevent_enable(m_socket, EV_READ | EV_PERSIST);
}
void IRPCAgentIMPL::ReadData()
{
std::stringstream ss;
uint32_t size = 0;
struct evbuffer* input = bufferevent_get_input(m_socket);
//! 循环读取数据
while (evbuffer_get_length(input))
{
char buffer[1024];
int n = evbuffer_remove(input, buffer, sizeof(buffer));
if (n > 0)
{
size += n;
ss << buffer;
}
}
//! 调用回调函数
std::string data = ss.str();
m_handler(data.c_str(), size);
}
void IRPCAgentIMPL::ReleaseBase()
{
if (m_base)
{
event_base_loopbreak(m_base);
event_base_free(m_base);
m_base = nullptr;
}
}
void IRPCAgentIMPL::ReleaseSocket()
{
if (m_socket)
{
bufferevent_free(m_socket);
m_socket = nullptr;
}
}
void IRPCAgentIMPL::ReleaseListener()
{
if (m_listener)
{
evconnlistener_free(m_listener);
m_listener = nullptr;
}
}
void IRPCAgentIMPL::ReadCallback(bufferevent *bev, void* ctx)
{
IRPCAgentIMPL* obj = static_cast<IRPCAgentIMPL*>(ctx);
obj->ReadData();
}
void IRPCAgentIMPL::ErrorCallback(struct bufferevent *bev, short events, void* ctx)
{
IRPCAgentIMPL* obj = static_cast<IRPCAgentIMPL*>(ctx);
obj->ReleaseSocket();
}
void IRPCAgentIMPL::DefaultHandler(const char* pstr, const uint32_t& size)
{
// 发生这个问题, 目前想到有几种可能性
// 1. 由于初始化异步问题, 在接到数据之前都没有设置handle,这将会导致数据丢失
// 2. 由于释放资源的异步问题, 在关闭socket之前就关闭handle回调,这将会导致数据丢失
// 3. 如果你本身就不需要数据回调,建议不要调用init,直到有需要的时候才调用
assert(0 && "Agent : 没有设置Handler");
}