[apue] 一个工业级、跨平台的 tcp 网络服务框架:gevent

时间:2023-03-09 22:00:45
[apue] 一个工业级、跨平台的 tcp 网络服务框架:gevent

作为公司的公共产品,经常有这样的需求:就是新建一个本地服务,产品线作为客户端通过 tcp 接入本地服务,来获取想要的业务能力。

与印象中动辄处理成千上万连接的 tcp 网络服务不同,这个本地服务是跑在客户机器上的,Win32 上作为开机自启动的 windows 服务运行;

Linux 上作为 daemon 在后台运行。总的说来就是用于接收几个产品进程的连接,因此轻量化是其最重要的要求,在这个基础上要能兼顾跨平台就可以了。

其实主要就是 windows,再兼顾一点儿 linux。

考察了几个现有的开源网络框架,从 ACE 、boost::asio 到 libevent,都有不尽于人意的地方:

a) ACE:太重,只是想要一个网络框架,结果它扒拉扒拉一堆全提供了,不用还不行;

b) boost::asio:太复杂,牵扯到 boost 库,并且引入了一堆 c++ 模板,需要高版本 c++ 编译器支持;

c) libevent:这个看着不错,当时确实用这个做底层封装了一版,结果发版后发现一个比较致命的问题,导致在防火墙设置比较严格的机器上初始化失败,这个后面我会详细提到。

其它的就更不用说了,之前也粗略看过陈硕的 muddo,总的感觉吧,它是基于其它开源框架不足地方改进的一个库,有相当可取的地方,但是这个改进的方向也主要是解决更大并发、更多连接,不是我的痛点,所以没有继续深入研究。

好了,与其在不同开源框架之间纠结,不如自己动手写一个。

反正我的场景比较固定,不用像它们那样面面俱,我给自己罗列了一些这个框架需要支持基本的功能:

1)同步写、异步读;

2)可同时监听多路事件,基于 1)这里只针对异步 READ 事件(包含连接进入、连接断开),写数据是同步的,因而不需要处理异步 WRITE 事件;

3)要有设置一次性和周期性定时器的能力 (业务决定的);

4)不需要处理信号 (windows 上也没信号这一说,linux 自己搞搞 sigaction 就好啦);

……

虽然这个框架未来只会运行在用户的单机上,但是我不希望它一出生就带有性能缺陷,所以性能平平的 select 没能进入我的法眼,我决定给它装上最强大的心脏:

Windows 平台: iocp

Linux 平台:epoll

ok,从需求到底层技术路线,貌似都讲清楚了,依照 libevent 我给它取名为 gevent,下面我们从代码级别看下这个框架是怎么简化 tcp 服务搭建这类工作的。

首先看一下这个 tcp 服务框架的 sample:

svc_handler.h

 #include "EventBase.h"
#include "EventHandler.h" class GMyEventBase : public GEventBase
{
public:
GEventHandler* create_handler ();
}; class svc_handler : public GJsonEventHandler
{
public:
virtual ~svc_handler () {}
virtual void on_read_msg (Json::Value const& val);
};

epoll_svc.cpp

 #include <stdio.h>
#include "svc_handler.h"
#include <signal.h> GMyEventBase g_base;
GEventHandler* GMyEventBase::create_handler ()
{
return new svc_handler;
} void sig_int (int signo)
{
printf ("%d caught\n", signo);
g_base.exit ();
printf ("exit ok\n");
} int main (int argc, char *argv[])
{
if (argc < )
{
printf ("usage: epoll_svc port\n");
return -;
} unsigned short port = atoi (argv[]); #ifndef WIN32
struct sigaction act;
act.sa_handler = sig_int;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_RESTART;
if (sigaction (SIGINT, &act, NULL) < )
{
printf ("install SIGINT failed, errno %d\n", errno);
return -;
}
else
printf ("install SIGINT ok\n");
#endif // to test small message block
if (g_base.init (/*8, 10*/) < )
return -; printf ("init ok\n");
do
{
if (!g_base.listen (port))
{
g_base.exit ();
printf ("exit ok\n");
break;
} printf ("listen ok\n");
g_base.run ();
printf ("run over\n");
} while (); g_base.fini ();
printf ("fini ok\n"); g_base.cleanup ();
printf ("cleanup ok\n");
return ;
}

这个服务的核心是 GMyEventBase 类,它使用了框架中的 GEventBase 类,从后者派生而来,

只改写了一个 create_handler 接口来提供我们的事件处理对象 svc_handler,它是从框架中的 GEventHandler 派生而来,

svc_handler 只改写了一个 on_read_msg 来处理 Json 格式的消息输入。

程序的运行就是分别调用 GMyEventBase(实际上是GEventBase)  的 init / listen / run / fini / cleaup 方法。

而与业务相关的代码,都在 svc_handler 中处理:

svc_handler.cpp

 #include "svc_handler.h"

 void svc_handler::on_read_msg (Json::Value const& val)
{
int key = val["key"].asInt ();
std::string data = val["data"].asString ();
printf ("got %d:%s\n", key, data.c_str ()); Json::Value root;
Json::FastWriter writer;
root["key"] = key + ;
root["data"] = data; int ret = ;
std::string resp = writer.write(root);
resp = resp.substr (, resp.length () - ); // trim tailing \n
if ((ret = send (resp)) <= )
printf ("send response failed, errno %d\n", errno);
else
printf ("response %d\n", ret);
}

它期待 Json 格式的数据,并且有两个字段 key(int) 与 data (string),接收数据后将 key 增 1 后返回给客户端。

再来看下客户端 sample:

clt_handler.h

 #include "EventBaseAR.h"
#include "EventHandler.h" class GMyEventBase : public GEventBaseWithAutoReconnect
{
public:
GEventHandler* create_handler ();
}; class clt_handler : public GJsonEventHandler
{
public:
virtual ~clt_handler () {}
#ifdef TEST_TIMER
virtual bool on_timeout (GEV_PER_TIMER_DATA *gptd);
#endif
virtual void on_read_msg (Json::Value const& val);
};

epoll_clt.cpp

 #include <stdio.h>
#include "clt_handler.h"
#include <signal.h> //#define TEST_READ
//#define TEST_CONN
//#define TEST_TIMER GMyEventBase g_base;
GEventHandler* GMyEventBase::create_handler ()
{
return new clt_handler;
} int sig_caught = ;
void sig_int (int signo)
{
sig_caught = ;
printf ("%d caught\n", signo);
g_base.exit ();
printf ("exit ok\n");
} void do_read (GEventHandler *eh, int total)
{
char buf[] = { };
int ret = , n = , key = , err = ;
char *ptr = nullptr;
while ((total == || n++ < total) && fgets (buf, sizeof(buf), stdin) != NULL)
{
// skip \n
buf[strlen(buf) - ] = ;
//n = sscanf (buf, "%d", &key);
key = strtol (buf, &ptr, );
if (ptr == nullptr)
{
printf ("format: int string\n");
continue;
} Json::Value root;
Json::FastWriter writer;
root["key"] = key;
// skip space internal
root["data"] = *ptr == ' ' ? ptr + : ptr; std::string req = writer.write (root);
req = req.substr (, req.length () - ); // trim tailing \n
if ((ret = eh->send (req)) <= )
{
err = ;
printf ("send %d failed, errno %d\n", req.length (), errno);
break;
}
else
printf ("send %d\n", ret);
} if (total == )
printf ("reach end\n"); if (!err)
{
eh->disconnect ();
printf ("call disconnect to notify server\n");
} // wait receiving thread
//sleep (3);
// if use press Ctrl+D, need to notify peer our break
} #ifdef TEST_TIMER
void test_timer (unsigned short port, int period_msec, int times)
{
int n = ;
GEventHandler *eh = nullptr; do
{
eh = g_base.connect (port);
if (eh == nullptr)
break; printf ("connect ok\n");
void* t = g_base.timeout (, period_msec, eh, NULL);
if (t == NULL)
{
printf ("timeout failed\n");
break;
}
else
printf ("set timer %p ok\n", t); // to wait timer
do
{
sleep ();
printf ("wake up from sleep\n");
} while (!sig_caught && n++ < times); g_base.cancel_timer (t);
} while ();
}
#endif #ifdef TEST_CONN
void test_conn (unsigned short port, int per_read, int times)
{
# ifdef WIN32
srand (GetCurrentProcessId());
# else
srand (getpid ());
# endif
int n = , elapse = ;
clt_handler *eh = nullptr; do
{
eh = (clt_handler *)g_base.connect (port);
if (eh == nullptr)
break; printf ("connect ok\n"); do_read (eh, per_read);
# ifdef WIN32
elapse = rand() % ;
Sleep(elapse);
printf ("running %d ms\n", elapse);
# else
elapse = rand () % ;
usleep (elapse);
printf ("running %.3f ms\n", elapse/1000.0);
# endif } while (!sig_caught && n++ < times);
}
#endif #ifdef TEST_READ
void test_read (unsigned short port, int total)
{
int n = ;
GEventHandler *eh = nullptr; do
{
eh = g_base.connect (port);
if (eh == nullptr)
break; printf ("connect ok\n");
do_read (eh, total);
} while ();
}
#endif int main (int argc, char *argv[])
{
if (argc < )
{
printf ("usage: epoll_clt port\n");
return -;
} unsigned short port = atoi (argv[]); #ifndef WIN32
struct sigaction act;
act.sa_handler = sig_int;
sigemptyset(&act.sa_mask);
// to ensure read be breaked by SIGINT
act.sa_flags = ; //SA_RESTART;
if (sigaction (SIGINT, &act, NULL) < )
{
printf ("install SIGINT failed, errno %d\n", errno);
return -;
}
#endif if (g_base.init () < )
return -; printf ("init ok\n"); #if defined(TEST_READ)
test_read (port, ); // 0 means infinite loop until user break
#elif defined(TEST_CONN)
test_conn (port, , );
#elif defined (TEST_TIMER)
test_timer (port, , );
#else
# error please define TEST_XXX macro to do something!
#endif if (!sig_caught)
{
// Ctrl + D ?
g_base.exit ();
printf ("exit ok\n");
}
else
printf ("has caught Ctrl+C\n"); g_base.fini ();
printf ("fini ok\n"); g_base.cleanup ();
printf ("cleanup ok\n");
return ;
}

客户端同样使用了 GEventBase 的派生类 GMyEventBase 来作为事件循环的核心,所不同的是(注意并非之前例子里的那个类,虽然同名),它提供了 clt_handler 来处理自己的业务代码。

另外为了提供连接中断后自动向服务重连的功能,这里 GMyEventBase 派生自 GEventBase 类的子类 GEventBaseWithAutoReconnect (位于 EventBaseAR.h/cpp 中)。

程序的运行是分别调用 GEventBase 的 init / connect / fini / cleaup 方法以及 GEventHandler 的 send / disconnect 来测试读写与连接。

定义宏 TEST_READ 用来测试读写;定义宏 TEST_CONN 可以测试连接的通断及读写;定义宏 TEST_TIMER 来测试周期性定时器及读写。它们是互斥的。

clt_handler 主要用来异步接收服务端的回送数据并打印:

clt_handler.cpp

 #include "clt_handler.h"

 #ifdef TEST_TIMER
extern void do_read (clt_handler *, int);
bool clt_handler::on_timeout (GEV_PER_TIMER_DATA *gptd)
{
printf ("time out ! id %p, due %d, period %d\n", gptd, gptd->due_msec, gptd->period_msec);
do_read ((clt_handler *)gptd->user_arg, );
return true;
}
#endif void clt_handler::on_read_msg (Json::Value const& val)
{
int key = val["key"].asInt ();
std::string data = val["data"].asString ();
printf ("got %d:%s\n", key, data.c_str ());
}

这个测试程序可以通过在控制台手工输入数据来驱动,也可以通过测试数据文件来驱动,下面的 awk 脚本用来制造符合格式的测试数据:

epoll_gen.awk

 #! /bin/awk -f
BEGIN {
WORDNUM =
for (i = ; i <= WORDNUM; i++) {
printf("%d %s\n", randint(WORDNUM), randword())
}
} # randint(n): return a random integer number which is >= and <= n
function randint(n) {
return int(n *rand()) +
} # randlet(): return a random letter, which maybe upper, lower or number.
function randlet() {
return substr("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", randint(), )
} # randword(LEN): return a rand word with a length of LEN
function randword(LEN) {
randw=""
for( j = ; j <= LEN; j++) {
randw=randw randlet()
}
return randw
}

生成的测试文件格式如下:

238 s0jKlYkEjwE4q3nNJugF
568 0cgNaSgDpP3VS45x3Wum
996 kRF6SgmIReFmrNBcCecj
398 QHQqCrB5fC61hao1BV2x
945 XZ6KLtA4jZTEnhcAugAM
619 WE95NU7FnsYar4wz279j
549 oVCTmD516yvmtuJB2NG3
840 NDAaL5vpzp8DQX0rLRiV
378 jONIm64AN6UVc7uTLIIR
251 EqSBOhc40pKXhCbCu8Ey

整个工程编译的话就是一个 CMakeLists 文件,可以通过 cmake 生成对应的 Makefile 或 VS solution 来编译代码:

CMakeLists.txt

 cmake_minimum_required(VERSION 3.0)
project(epoll_svc)
include_directories(../core ../include)
set(CMAKE_CXX_FLAGS "-std=c++11 -pthread -g -Wall ${CMAKE_CXX_FLAGS}")
link_directories(${PROJECT_SOURCE_DIR}/../lib)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/../bin) add_executable (epoll_svc epoll_svc.cpp svc_handler.cpp ../core/EventBase.cpp ../core/EventHandler.cpp ../core/log.cpp)
IF (WIN32)
target_link_libraries(epoll_svc jsoncpp ws2_32)
ELSE ()
target_link_libraries(epoll_svc jsoncpp rt)
ENDIF () add_executable (epoll_clt epoll_clt.cpp clt_handler.cpp ../core/EventBase.cpp ../core/EventBaseAR.cpp ../core/EventHandler.cpp ../core/log.cpp)
target_compile_definitions(epoll_clt PUBLIC -D TEST_READ)
IF (WIN32)
target_link_libraries(epoll_clt jsoncpp ws2_32)
ELSE ()
target_link_libraries(epoll_clt jsoncpp rt)
ENDIF () add_executable (epoll_local epoll_local.cpp)
IF (WIN32)
target_link_libraries(epoll_local jsoncpp ws2_32)
ELSE ()
target_link_libraries(epoll_local jsoncpp rt)
ENDIF ()

这个项目包含三个编译目标,分别是 epoll_svc 、epoll_clt 与 epoll_local,其中前两个可以跨平台编译,后一个只能在 Linux 平台编译,用来验证 epoll 的一些特性。

编译完成后,首先运行服务端:

>./epoll_svc 1025

然后运行客户端:

>./epoll_clt 1025 < demo

测试多个客户端同时连接,可以使用下面的脚本:

epoll_start.sh

 #! /bin/bash
# /bin/sh -> /bin/dash, do not recognize our for loop for((i=;i<;i=i+))
do
./epoll_clt < demo &
echo "start $i"
done

可以同时启动 10 个客户端。

通过 Ctrl+C 退出服务端;通过 Ctrl+C 或 Ctrl+D 退出单个客户端;

通过下面的脚本来停止多个客户端与服务端:

epoll_stop.sh

 #! /bin/sh
pkill -INT epoll_clt
sleep
pkill -INT epoll_svc

框架的用法介绍完之后,再简单游览一下这个库的各层级对外接口。

EventBase.h

 #pragma once

 #include "EventHandler.h"
#include <string>
#include <map>
#include <mutex>
#include <condition_variable>
#include "thread_group.hpp" #define GEV_MAX_BUF_SIZE 65536 class GEventBase : public IEventBase
{
public:
GEventBase();
~GEventBase(); #ifdef WIN32
virtual HANDLE iocp () const;
#else
virtual int epfd () const;
#endif
virtual bool post_timer(GEV_PER_TIMER_DATA *gptd);
virtual GEventHandler* create_handler() = ; // thr_num :
// =0 - no default thread pool, user provide thread and call run
// <0 - use max(|thr_num|, processer_num)
// >0 - use thr_num
bool init(int thr_num = -, int blksize = GEV_MAX_BUF_SIZE
#ifndef WIN32
, int timer_sig = SIGUSR1
#endif
); bool listen(unsigned short port, unsigned short backup = );
GEventHandler* connect(unsigned short port, GEventHandler* exist_handler = NULL);
// PARAM
// due_msec: first timeout milliseconds
// period_msec: later periodically milliseconds
// arg: user provied argument
// exist_handler: reuse the timer handler
//
// RETURN
// NULL: failed
void* timeout(int due_msec, int period_msec, void *arg, GEventHandler *exist_handler);
bool cancel_timer(void* tid);
void fini();
void run();
void exit(int extra_notify = );
void cleanup(); protected:
#ifdef WIN32
bool do_accept(GEV_PER_IO_DATA *gpid);
bool do_recv(GEV_PER_HANDLE_DATA *gphd, GEV_PER_IO_DATA *gpid);
void do_error(GEV_PER_HANDLE_DATA *gphd); int init_socket();
bool issue_accept();
bool issue_read(GEV_PER_HANDLE_DATA *gphd);
bool post_completion(DWORD bytes, ULONG_PTR key, LPOVERLAPPED ol); #else
bool do_accept(int fd);
bool do_recv(conn_key_t key);
void do_error(conn_key_t key); bool init_pipe();
void close_pipe();
bool post_notify (char ch, void* ptr = nullptr);
void promote_leader (std::unique_lock<std::mutex> &guard); GEventHandler* find_by_key (conn_key_t key, bool erase);
GEventHandler* find_by_fd (int fd, conn_key_t &key, bool erase); # ifdef HAS_SIGTHR
void sig_proc ();
# endif
#endif bool do_timeout(GEV_PER_TIMER_DATA *gptd); virtual bool on_accept(GEV_PER_HANDLE_DATA *gphd);
virtual bool on_read(GEventHandler *h, GEV_PER_IO_DATA *gpid);
virtual void on_error(GEventHandler *h);
virtual bool on_timeout (GEV_PER_TIMER_DATA *gptd); protected:
volatile bool m_running = false;
int m_thrnum = ;
int m_blksize = GEV_MAX_BUF_SIZE;
std::thread_group m_grp;
SOCKET m_listener = INVALID_SOCKET; std::mutex m_mutex; // protect m_map
std::mutex m_tlock; // protect m_tmap
// timer_t may conflict when new timer created after old timer closed
//std::map <timer_t, GEventHandler *> m_tmap;
std::map <GEV_PER_TIMER_DATA*, GEventHandler *> m_tmap; #ifdef WIN32
LPFN_ACCEPTEX m_acceptex = nullptr;
LPFN_GETACCEPTEXSOCKADDRS m_getacceptexsockaddrs = nullptr;
HANDLE m_iocp = NULL;
HANDLE m_timerque = NULL; std::map<GEV_PER_HANDLE_DATA*, GEventHandler*> m_map;
#else
int m_ep = -;
int m_pp[];
int m_tsig = ; // signal number for timer std::mutex m_lock; // protect epoll
pthread_t m_leader = -;
std::map<conn_key_t, GEventHandler*> m_map;
# ifdef HAS_SIGTHR
// special thread only cares about signal
std::thread *m_sigthr = nullptr;
# endif
#endif
};
  • init,它在底层启动 thr_num 个线程来跑 run 方法;每次 IO 的块缓冲区大小由 blksize 指定;它内部还创建了对应的 iocp 或 epoll 对象,便于之后加入 socket 句柄进行处理。
  • exit,它通知线程池中的所有线程退出等待,windows 上是通过 PostQueuedCompletionStatus,Linux 上是通过在自建的一个 pipe 上写数据以触发 epoll 退出(这个 pipe 在 init 中创建并加入 epoll);
  • fini,它在所有工作线程退出后,关闭之前创建的对象,清理事件循环用到的资源;
  • cleanup,它清理之前建立的 fd-handler 映射,清理遗留的处理器并释放资源;
  • run,它是线程池运行函数,windows 上是通过 GetQueuedCompletionStatus 在 iocp 上等待;在 linux 上是通过 epoll_wait 在 epoll 上等待事件。当有事件产生后,根据事件类型,分别调用 do_accept / on_accept、do_recv / on_read、do_error / on_error 回调来分派事件;
  • listen,创建侦听 socket 并加入到 iocp 或 epoll 中;
  • connect,连接到远程服务并将成功连接的 socket 加入到 iocp 或  epoll 中;
  • timeout,设置定时器事件,windows 上是通过 CreateTimerQueueTimer 实现定时器超时;linux 则是通过 timer_create 实现的,都是系统现成的东西,只不过在系统定时器到期后,给对应的 iocp 或 epoll 对象发送了一个通知而已,在 linux 上这个通知机制是上面提到过的 pipe 来实现的,因而有一定延迟,不能指定精度太小的定时器;
  • cancel_timer,取消之前设置的定时器。

然后看下 GEventHandler 提供的回调接口,应用可以从它派生并完成业务相关代码:

EventHandler.h

 #pragma once
#include "platform.h" #ifdef WIN32
// must ensure <winsock2.h> precedes <widnows.h> included, to prevent winsock2.h conflict with winsock.h
# include <WinSock2.h>
# include <Windows.h>
# include <mswsock.h> // for LPFN_ACCEPTEX & LPFN_GETACCEPTEXSOCKADDRS later in EventBase.h
#else
# include <unistd.h> // for close
# include <sys/socket.h>
# include <sys/epoll.h>
# include <sys/time.h>
# include <netinet/in.h> // for struct sockaddr_in
# include <arpa/inet.h> // for inet_addr/inet_ntoa
# include <string.h> // for memset/memcpy
# include <signal.h>
#endif #include <mutex>
#include "jsoncpp/json.h" class GEventHandler;
struct GEV_PER_TIMER_DATA;
class IEventBase
{
public:
#ifdef WIN32
virtual HANDLE iocp () const = ;
#else
virtual int epfd () const = ;
#endif virtual void* timeout(int due_msec, int period_msec, void *arg, GEventHandler *exist_handler) = ;
virtual bool cancel_timer(void* tid) = ;
virtual bool post_timer(GEV_PER_TIMER_DATA *gptd) = ;
}; #ifdef WIN32
enum GEV_IOCP_OP
{
OP_TIMEOUT = ,
OP_ACCEPT,
OP_RECV,
};
#else
// the purpose of this key is to distinguish different connections with same fd !
// (when connection break and re-established soon, fd may not change but port will change)
struct conn_key_t
{
int fd;
unsigned short lport;
unsigned short rport; conn_key_t (int f, unsigned short l, unsigned short r);
bool operator< (struct conn_key_t const& rhs) const;
};
#endif struct GEV_PER_HANDLE_DATA
{
SOCKET so;
SOCKADDR_IN laddr;
SOCKADDR_IN raddr; #ifndef WIN32
conn_key_t key () const;
#endif GEV_PER_HANDLE_DATA(SOCKET s, SOCKADDR_IN *l, SOCKADDR_IN *r);
virtual ~GEV_PER_HANDLE_DATA();
}; struct GEV_PER_IO_DATA
{
SOCKET so;
#ifdef WIN32
GEV_IOCP_OP op;
OVERLAPPED ol;
WSABUF wsa; // wsa.len is buffer length
DWORD bytes; // after compeleted, bytes trasnfered
#else
char *buf;
int len;
#endif GEV_PER_IO_DATA(
#ifdef WIN32
GEV_IOCP_OP o,
#endif
SOCKET s, int l);
virtual ~GEV_PER_IO_DATA();
}; struct GEV_PER_TIMER_DATA
#ifdef WIN32
: public GEV_PER_IO_DATA
#endif
{
IEventBase *base;
int due_msec;
int period_msec;
void *user_arg;
bool cancelled;
#ifdef WIN32
HANDLE timerque;
HANDLE timer;
#else
timer_t timer;
#endif GEV_PER_TIMER_DATA(IEventBase *base, int due, int period, void *arg
#ifdef WIN32
, HANDLE tq);
#else
, timer_t tid);
#endif virtual ~GEV_PER_TIMER_DATA();
void cancel ();
}; class GEventHandler
{
public:
GEventHandler();
virtual ~GEventHandler(); GEV_PER_HANDLE_DATA* gphd();
GEV_PER_TIMER_DATA* gptd();
bool connected();
void disconnect();
void clear();
SOCKET fd(); int send(char const* buf, int len);
int send(std::string const& str); virtual bool reuse();
virtual bool auto_reconnect();
virtual void arg(void *param) = ;
virtual void reset(GEV_PER_HANDLE_DATA *gphd, GEV_PER_TIMER_DATA *gptd, IEventBase *base);
virtual bool on_read(GEV_PER_IO_DATA *gpid) = ;
virtual void on_error(GEV_PER_HANDLE_DATA *gphd);
// note when on_timeout called, handler's base may cleared by cancel_timer, use gptd->base instead if it is not null.
virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd) = ;
virtual void cleanup(bool terminal);
void close(bool terminal); protected:
GEV_PER_HANDLE_DATA *m_gphd = nullptr;
GEV_PER_TIMER_DATA *m_gptd = nullptr;
IEventBase *m_base = nullptr;
// us so instead of m_gphd,
// as the later one may destroyed during using..
SOCKET m_so;
}; // a common handler to process json protocol.
class GJsonEventHandler : public GEventHandler
{
public:
//virtual void on_read();
virtual void arg(void *param);
virtual void reset(GEV_PER_HANDLE_DATA *gphd, GEV_PER_TIMER_DATA *gptd, IEventBase *base);
virtual bool on_read(GEV_PER_IO_DATA *gpid);
virtual void on_read_msg(Json::Value const& root) = ;
virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd);
virtual void cleanup(bool terminal); protected:
// protect m_stub to prevent multi-entry
#ifdef HAS_ET
std::mutex m_mutex;
#endif std::string m_stub;
};

这里主要有两个类,GEventHandler 处理通用的基于流的数据;GJsonEventHandler 处理基于 json 格式的数据。

前者需要重写 on_read 方法来处理块数据;后者需要重写 on_read_msg 方法来处理 json 数据。

目前 json 的解析是通过 jsoncpp 库完成的,这个库本身是跨平台的(本 git 库仅提供 64 位 Linux 静态链接库及 VS2013 的 32 位 Release 版本 Windows 静态库)。

svc_handler 与 clt_handler  均从 GJsonEventHandler 派生。

如果有新的流格式需要处理 ,只需要从 GEventHandler 类派生新的处理类即可。

除了读取连接上的数据,还有其它一些重要的回调接口,列明如下:

  • on_read,连接上有数据到达;
  • on_error,连接断开;
  • on_tmeout,定时器事件;
  • ……

如果有新的事件需要处理 ,也可以在这里扩展。

最后看下 GEventBaseWithAutoReconnect 提供的与自动重连相关的接口:

EventBaseAR.h

 #pragma once

 #include "EventBase.h"
#include <thread> #define GEV_RECONNECT_TIMEOUT 2 // seconds
#define GEV_MAX_RECONNECT_TIMEOUT 256 // seconds class GEventBaseWithAutoReconnect : public GEventBase
{
public:
GEventBaseWithAutoReconnect(int reconn_min = GEV_RECONNECT_TIMEOUT, int reconn_max = GEV_MAX_RECONNECT_TIMEOUT);
~GEventBaseWithAutoReconnect(); bool do_connect(unsigned short port, void *arg);
GEventHandler* connector(); protected:
virtual void on_error(GEventHandler *h);
virtual bool on_timeout(GEV_PER_TIMER_DATA *gptd); virtual void on_connect_break();
virtual bool on_connected(GEventHandler *app); protected:
void do_reconnect(void *arg); protected:
unsigned short m_port;
GEventHandler* m_app;
GEventHandler* m_htimer;
void* m_timer;
int m_reconn_min;
int m_reconn_max;
int m_reconn_curr;
};

其实比较简单,只比 GEventBase 类多了一个  do_connect 方法,来扩展 connect 不能自动重连的问题。

底层的话,是通过定时器来实现指数后退重连算法的。


后记

这个框架已经应用到我司的公共产品中,并为数个 tcp 服务提供底层支撑,经过百万级别用户机器验证,运行稳定性还是可以的,所以当得起“工业级”这三个字。

前面在说到开源库的选型时还留了一个口子没有交待,这里一并说下。

其实最早的重构版本是使用 libevent 来实现的,但是发现它在 windows 上使用的是低效的 select,

而且为了增加、删除句柄,它又使用了一种 self-pipe-trick 的技巧,简单说来的就是下面的代码序列:

listen (listen_fd, 1);
……
connect (connect_fd, &addr, size);
……
accept_fd = accept (listen_fd, &addr, &size);

在缺乏 pipe 调用的 win32 环境制造了一个 socket 自连接,从而进行一些通知。

这一步是必要的,如果不能成功连接就会导致整个 libevent 初始化失败,从而运行不起来。

不巧的是,在一些 windows 机器上(约占用户总量 10%),由于防火墙设置严格,上述 listen 与 connect 调用可以成功,

但是 accept 会失败返回,从而导致整个服务退出 (防火墙会严格禁止不在白名单上侦听的端口的连接)。

对于已知端口,可以通过在防火墙上设置白名单来避免,但是对于这种随机 listen 的端口,真的是太难了,基本无解。

回头考察了一下 asio,windows 上使用的是 iocp,自然没有这个自连接;

ACE 有多种实现可供选择,如果使用  ACE_Select_Reactor / ACE_TP_Reactor 是会有这个自连接,

但是你可以选择其它实现,如基于 WaitForMultipleEvents 的 ACE_WFMO_Reactor(最大只支持 62 个句柄,放弃),

或基于 iocp 的 ACE_Proactor (前摄式,与反应式在编程上稍有不同,更接近于 asio)就没有这个自连接。

再说的深一点,其实公司最早的网络库使用的就是基于 boost 的 asio,大量的使用了 c++ 模板,

有时候产生了一些崩溃,但是根据 dump 完全无法定位崩溃点(各种冗长的模板展开名称),

导致了一些顽固的已知 bug 一起找不到崩溃点而无法解决(虽然量不大),所以才有了要去重新选型网络库以及后来这一系列的东西。

本来一开始我是想用 ACE 的,因为我读过这个库的源码,对里面所有的东西都非常熟悉,

但是看看 ACE 小 5 MB 的 dll 尺寸,还是放弃了(产品本身安装包也就这么大吧),

对于一个公司底层的公共组件,被各种产品携带,需要严格控制“体重”

(后来听说 ACE 按功能拆分了代码模块,你只需要选自己依赖的部分即可,不过我还没有试过)。

使用这个库代替之前的 boost::asio 后,我还有一个意外收获,就是编译出来的 dll 尺寸明显小了很多,700 K -> 500 K 的样子,看来所谓模板膨胀是真有其事……

最后奉上 gevent 的 github 链接,欢迎有相同需求的小伙伴前来“复刻” :

https://github.com/goodpaperman/gevent