ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

时间:2023-03-09 01:37:29
ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

框架描述

服务器层次:

ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

  • I/O层:对应具体的文件描述符处理,对应ACE中的handle。
  • Dispatch层:事件分发,将I/O事件分发到对应绑定的处理队列等待业务处理,对应ACE中的Event_handle。
  • 业务层:处理具体业务,包含一组线程或进程,并发处理业务。对应ACE中的ACE_Task。

三层结构与五层网络的网络层,传输层,应用层类似对应。

Reactor模式:

ACE - Reactor实现I/O,Dispatch,Service三层完整服务器(完结)

  • I/O处理:ACE_Reactor使用select复用完成,将注册进去的IOhandle进行事件监听。
  • 消息队列:ACE_Task中包含一个消息队列。I/O产生事件后执行绑定的Event函数将消息插入对应的消息队列。
  • 服务进程:ACE_Task内可以构造一个线程池,获取消息队列进行业务并发处理。

下面是Reactor代码实现,包含I/O、dispatch、反应器、和线程池:

 /*-----------------------------------------------------------------
* filename: Reactor.cpp
* author: bing
* time: 2016-06-29 15:26
* function: using ACE Reactor implement I/O multiplex server,
* include service thread pool.
*-----------------------------------------------------------------*/
#include <ace/INET_Addr.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/SOCK_Stream.h>
#include <ace/Reactor.h>
#include <ace/Log_Msg.h>
#include "ace/Task.h"
#include "ace/OS.h"
#include <list> #define MAX_BUFF_SIZE 1024
#define LISTEN_PORT 5010
#define SERVER_IP ACE_LOCALHOST
#define THREAD_NUM 10 struct MsgData
{
ACE_HANDLE* IOHandle;
int DataFlag;
char Data[MAX_BUFF_SIZE];
MsgData()
{
IOHandle = NULL;
DataFlag = -;
ACE_OS::memset(Data, , sizeof(Data));
}
}; class TaskThread; class ServerStream : public ACE_Event_Handler
{
public:
ServerStream(TaskThread* pMsgQueue);
~ServerStream();
ACE_SOCK_Stream& GetStream(){return m_Svr_stream;} //给accept提供接口绑定数据通道
virtual int handle_input(ACE_HANDLE fd); //I/O触发事件后调用
void close();
virtual ACE_HANDLE get_handle(void) const {return m_Svr_stream.get_handle();} //不重载需要手动将handle传入ACE_Reactor
private:
ACE_SOCK_Stream m_Svr_stream;
TaskThread* m_MsgQueue;
}; std::list<ServerStream*> g_StreamPool; //stream pool class TaskThread: public ACE_Task<ACE_MT_SYNCH>
{
public:
virtual int svc(void)
{
ACE_Message_Block *Msg;// = new ACE_Message_Block();
while()
{
getq(Msg); //空闲线程阻塞 ACE_Data_Block *Data_Block = Msg->data_block();
MsgData *pData = reinterpret_cast <MsgData*>(Data_Block->base());
if ( == pData->DataFlag)
{
std::list<ServerStream*>::iterator it;
for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
{
if (get_handle() == (*it)->get_handle())
{
g_StreamPool.erase(it);
delete *it;
break;
}
}
return ;
}
char strBuffer[MAX_BUFF_SIZE];
ACE_OS::memset(strBuffer, , sizeof(strBuffer));
ACE_OS::memcpy(strBuffer, pData->Data, sizeof(strBuffer));
/*
这里接口业务代码分发数据
*/
ACE_DEBUG((LM_INFO,"[time:%d]recevie msg:%s\n",(int)ACE_OS::time(),strBuffer));
//ACE_SOCK_Stream Stream(*(pData->IOHandle));
//Stream.send("server recive data!\n",sizeof("server recive data!")); //响应client数据
//ACE_OS::sleep(1); //模拟业务耗时
Msg->release(); //release,inclue data_block
//ACE_DEBUG((LM_INFO,"thread end queue count:%d\n",msg_queue_->message_count()));
}
return ;
}
};
typedef ACE_Singleton<TaskThread, ACE_Thread_Mutex> TaskThreadPool; ServerStream::ServerStream(TaskThread* pMsgQueue)
{
m_MsgQueue = pMsgQueue;
} ServerStream::~ServerStream()
{
close();
} /*------------------------------------------------------
* IO上报流数据,使用select复用上报,这里单线程处理
* 原来考虑直接把IO插队列给线程池处理,但是线程池和
* 这里是异步操作,线程没有处理队列这条消息ACE底层会
* 一直上报这个IO插消息队列,暂时在这里做单线程revc
* 考虑epoll边沿触发,一次上报处理
*------------------------------------------------------*/
int ServerStream::handle_input(ACE_HANDLE fd)
{
MsgData Message;
char strBuffer[MAX_BUFF_SIZE];
Message.DataFlag = m_Svr_stream.recv(strBuffer,MAX_BUFF_SIZE); //获取数据回select响应避免反复通知
if (- == Message.DataFlag)
{
ACE_DEBUG((LM_INFO, ACE_TEXT("recive data error!\n")));
return -;
}
else if( == Message.DataFlag)
{
close();
ACE_DEBUG((LM_INFO, ACE_TEXT("client closed!\n")));
}
ACE_Data_Block *Data_Block = new ACE_Data_Block; //线程做释放
ACE_HANDLE Cli_IO = get_handle(); Message.IOHandle = &Cli_IO;
ACE_OS::memcpy(Message.Data,strBuffer,sizeof(strBuffer));//传的data可带length信息来适配消息大小 char *p = reinterpret_cast <char*>(&Message);
Data_Block->base(p,sizeof(Message));
ACE_Message_Block* msg = new ACE_Message_Block(Data_Block);
m_MsgQueue->putq(msg); //put
//Data_Block->release();
return ;
} void ServerStream::close()
{
m_Svr_stream.close();
ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
} class ServerAcceptor : public ACE_Event_Handler
{
public:
ServerAcceptor(int port,char* ip);
~ServerAcceptor();
bool open();
virtual int handle_input(ACE_HANDLE fd); //有client连接
void close();
virtual ACE_HANDLE get_handle(void) const {return m_Svr_aceept.get_handle();}
private:
ACE_INET_Addr m_Svr_addr;
ACE_SOCK_Acceptor m_Svr_aceept;
}; ServerAcceptor::ServerAcceptor(int port,char* ip):m_Svr_addr(port,ip)
{
if (!open()) //open listen port
{
ACE_DEBUG((LM_INFO, ACE_TEXT("open failed!\n")));
}
else
{
ACE_DEBUG((LM_INFO, ACE_TEXT("open success!\n")));
TaskThreadPool::instance()->activate(THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED , THREAD_NUM);//创建10个线程处理业务
}
} ServerAcceptor::~ServerAcceptor()
{
close();
std::list<ServerStream*>::iterator it;
for (it = g_StreamPool.begin();it != g_StreamPool.end();++it)
{
if (NULL != (*it))
{
(*it)->close();
delete (*it);
}
}
} bool ServerAcceptor::open()
{
if (- == m_Svr_aceept.open(m_Svr_addr,))
{
ACE_DEBUG((LM_ERROR,ACE_TEXT("failed to accept\n")));
m_Svr_aceept.close();
return false;
}
return true;
} int ServerAcceptor::handle_input(ACE_HANDLE fd )
{
ServerStream *stream = new ServerStream(TaskThreadPool::instance()); //产生新通道
if (NULL != stream)
{
g_StreamPool.push_back(stream);//暂时存储全局变量用于内存管理,优化可增加一个连接管理类管理连接通道
}
if (m_Svr_aceept.accept(stream->GetStream()) == -) //绑定通道
{
printf("accept client fail\n");
return -;
}
ACE_Reactor::instance()->register_handler(stream,ACE_Event_Handler::READ_MASK); //通道注册到ACE_Reactor
ACE_DEBUG((LM_INFO,"User connect success!,ClientPool num = %d\n",g_StreamPool.size()));
return ;
} void ServerAcceptor::close()
{
ACE_Reactor::instance()->remove_handler(this,ACE_Event_Handler::ACCEPT_MASK);
m_Svr_aceept.close();
} int ACE_TMAIN()
{
ServerAcceptor server(LISTEN_PORT,(char *)SERVER_IP);
ACE_Reactor::instance()->register_handler(&server,ACE_Event_Handler::ACCEPT_MASK); //listen port注册到ACE_Reactor ACE_Reactor::instance()->run_reactor_event_loop(); //进入消息循环,有I/O事件回调handle_input
return ;
}

代码实现了最简单的完整并发服务器,有部分还值得思考和优化:

1.dispatch进行类封装

2.回话通道的数据流管理进行类封装

3.dispatch消息结构优化

4.dispatch处为单线程,直接传递I/O给线程获取数据流还是获取数据流完成后给线程,如何实现两个线程同步

5.底层I/O复用使用epoll边沿优化

6.业务buff处理优化,进行消息类型划分,进入不同业务处理

由于实现完整服务器代码以最简单形式实现,上述优化在实际商用代码中还需要大量封装优化考虑。