[置顶] 基于Boost.Asio的异步通信服务器设计与开发

时间:2022-09-09 12:37:49


boost::asio 通讯服务器实践

 

1. 开发环境搭建

1.1. Asio准备

万事开头难。对于一个C++的陌生者,编译一个开源的代码并不是一件轻松愉快的事情。为使大家在审阅和检测本代码可使用性时没有必要花费太多时间和精力去编译Boost::Asio库,在此把我在编译boost库时不愉快的经历记录下来,供参考。使用Asio有两种方式,一种是直接使用Asio库,到网站:

https://sourceforge.net/projects/asio/files/asio/1.11.0%20%28Development%29/可以下载单独的Asio库,目前最新版本为1.11。另一种是使用boost.asio库,该库依赖于Boost.System库,因此需要把下载后的boost源码编译生成相应的库才能使用。Boost下载地址http://www.boost.org/,目前最新版本为1.61.0. 我下载的版本为1.61.0版本。下载后解压到E:\boost1610目录下.

1.2. Qt准备

Qt官网上下载Qt。我电脑环境是联想TIANYI 100,4G内存,安装的windwos10笔记本开发,因此下载的是windows版本的Qt5.7版本,安装比较简单,按照提示实行“下一步”就可以。

1.3. 编译boost

编译boost比较麻烦一点,使用Qt自带的mingw编译boost步骤如下:

  1. 配置mingw的系统环境变量PATH。我把QT安装在C盘下,path值配置为:C:\Qt\Qt5.6.1\Tools\mingw492_32\bin。配置后再命令行中输入gcc -v 如果配置成功,则显示为:(配置完后,我试着在命令行中输入gcc -v,居然提示gcc不是内部命令。然后把电脑重起,再输入gcc -v,这次显示成功)。

  2. 下载boost后,解压到任意目录下我的解压目录为E:\boost1610;

  3. 打开cmd.exe,切换到E:\boost1610目录下,在命令行中实行bootstrap.bat gcc 生成b2.exe和bjam.exe。Bootstrap.bat 是在Windows下生成自己的bjam.exe 和b2.exe 的脚本。 在Linux下是bootstrap.sh.  Bootstrap.bat gcc命令实行完成后,会在QT解压的目录下生成bjam.exe 和b2.exe这两个文件.

    [置顶]        基于Boost.Asio的异步通信服务器设计与开发


     

  4. 实行b2.exe开始编译。这个命令会生成一个bin.v2的文件夹,为下一步通过bjam命令提供链接。

  5. 在命令行窗口下实行bjam "toolset=gcc" install编译boost源代码。表示将编译结果默认保存到C:\boost目录下或者指定目录 bjam.exe “-sTools=mingw” --prefix=d:\boost install 或者 Bjam.exe --prefix=d:\boost toolset=gcc install 来编译boost。编译完成后会看到D:\boost文件夹下面有两个子目录 include 和lib。 实行这个命令过程非常耗时,慢慢等待。

1.4. 使用boost库

方法一:

1、选择将这两个目录拷贝到mingw目录下的include和lib目录中,或者将这两个目录的路径添加到系统路径中。

2、由于在windows下系统默认的命名方式为libboost-iostreams-mgw44-mt-1-61-0.a 可以改为libboost-iostreams.a.  Mt 表示多线程,默认。

方法二:

建立Qt项目的时候,都会生成一个XXX.pro的文件,在该文件下增加如下代码:

#DEFINES += BOOST_THREAD_USE_LIB

INCLUDEPATH += E:\Boost\include\boost-1_61

#DEPENDPATH += E:\Boost\include\boost-1_61

LIBS += E:\Boost\lib\libboost_system-mgw49-mt-1_61.a

 

LIBS += -lpthread libwsock32 libws2_32

其中最后一行

LIBS += -lpthread libwsock32 libws2_32表示要用到windows的完成端口模型下的winsocket库。

建议使用方法二。

到此,开发环境基本搭建完成。下面开始Boost.Asio设计开发服务器软件。

2. Asio通讯服务器设计

2.1. 设计指标

  1. 跨平台,一套代码在不修改原代码基础上可运行于windows和Linux系统;

  2. 运行于Linux系统上时采用EPOLL模型,运行于Windows上时采用完成端口模型(IOCP),且并发连接量尽可能接近操作系统IO模型提供的理论并发量;

  3. 能接入任何通过TCP方式连接的客户端,不受任何具体协议数据包的限制;

  4. 客户端发送数据的时候,要求服务器应答时,服务器应答客户端时间应尽可能小,最低要求小于100毫秒。

  5. 有效管理服务器与客户端连接的生命周期

  6. 基础通讯与业务逻辑解耦。

2.2. 概要设计

由于跨平台要求,目前C++适合跨平台的第三方库有libevent、asio库等,我们决定采用asio库实现。服务程序由通讯服务类、连接类、连接管理类、业务接口模板类等四大类组成。为适应不同的客户端接入,服务程序不解析任何数据包,只接受客户端连接和接收客户端发送过来的数据。客户端连接成功后,调用连接器管理对象保存连接,同时向操作系统投递异步读事件,当客户端发送的数据到达服务器,调用连接器中的读事件读取数据,通过业务接口将数据传递到业务接口,数据的处理由业务接口进一步处理。客户端需要服务器应答的,由业务接口根据客户端与服务器端通讯协议格式构造应答数据包后,由连接器转发到客户端。

通讯服务类(TTcpServer):提供基础的端口监听监听,不处理任何业务逻辑,以达到通讯服务类通用的目的。对用户暴露接口有:

  1. SetIpAddress设置监听的IP地址和端口。

  2. SetRecycleSoketTime设置回收掉线的Socket时间间隔。

  3. Start启动方法。启动方法允许用户使用单线程运行和多线程运行,也可以在启动的同时注册自己的业务对象。线程数量由通讯服务类由运行服务的物理机CPU核心数决定,线程数量最大为CPU*2-1.不允许用户自行开辟线程。

  4. RegisterBussiness注册业务对象方法。通过RegisterBussiness方法注册用户通过实现业务接口类完成自己的业务逻辑。

    连接器类(TConnector):连接器类是通讯服务程序的核心,客户端与服务端连接成功后,会产生一个连接会话通道,该通道我们称为连接器,客户通过连接器发送数据到服务端,服务端通过连接器发送数据到客户端。每个连接器有一个唯一标识符ConnectorId,通过客户端唯一标识符ClientId建立与客户端的关联。用户可以通过查询连接器ConnectorId或者ClientId获取指定的连接器。连接器的主要作用就是读取数据或者下发数据到客户端,读取到的数据通过业务接口传递给业务处理类,由业务处理类处理数据。

    连接器管理类:连接器管理类提供保存连接器的容器,作为通讯内部管理类,提供连接器插入到容器、关闭连接器、删除连接器、更改连接器客户端ID和连接器工作参数。

    业务接口类:为用户提供业务处理模板。用户通过实行业务接口实现自己的业务逻辑。

    流程描述:

    [置顶]        基于Boost.Asio的异步通信服务器设计与开发 [置顶]        基于Boost.Asio的异步通信服务器设计与开发

2.3. 详细设计

曾经用Delphi写过windows完成端口模型的通讯代码,而C++,尚未入门,临时抱佛脚采用QT编辑器尝试编写本案例详细设计和具体实现,很多命名规则还保留有一些记忆中的Delphi风格。为方便专业级别的C++程序员审阅本案例,也因本人不喜欢阅读那些带下划线的代码,本详细设计命名规则解释如下:

  1. 所有的类名均用大写的T字母开头。

  2. public类成员变量统一由大写的字母开头。成员变量由两个单词组成的,每个单词的首字母均采用大写。

  3. Protect或者Private成员变量m+单词首字母命名。

  4. Public方法名,只有一个单词的由单词的第一个字母大写,其他字母小写。 由2个单词组成的,每个单词第一个字母采用大写,其他用小写。

  5. Protect或者private方法,均采用首个单词全部小写,其他单词的第一个字母大写。

  6. 除非不得已,本设计常量定义尽量采用const,不使用宏定义。

    通讯服务类、连接器类、连接器管理类、业务逻辑接口,业务逻辑示例TDefeaultBussiness类详细设计及各对象之间的关系如下图所示。

          [置顶]        基于Boost.Asio的异步通信服务器设计与开发

2.3.1. 通讯服务器类(TTcpserver

一、通讯服务器工作流程:

  1. 使用TTcpServer::TTcpServer()构造器生成一个TTcpserver对象,如TTcpserver  tcpserver; 或者使用带ip地址和端口号参数的构造器TTcpserver::TTcpServer(string ipAddr,int port)生成一个TTcpServer对象。

  2. 如果还没设置监听的IP地址和端口的,调用SetIpAddress设置监听IP地址和端口。

  3. 调用 RegisterBussiness方法绑定用户自己实现的业务逻辑对象。

  4. 调用SetRecycleSocketTime方法设置回收断线的客户端连接,该步骤可选,如果用户不调用该方法,则默认10分钟回收一次。

  5. 调用Start方法启动服务,参数为True时将根据运行的物理服务器CPU核数产生2*CPU核数-1的线程数量运行通讯服务。

  1. 功能描述:

  1. waitAccept方法:使用异步方式等待远程客户端连接,当有连接请求时。如果连接成功,调用连接成功回调函数来保存连接和投递异步读等待。

  2. acceptHandle方法:是waitAccept方法的回调函数,连接成功后该方法被调用,保存连接器和投递异步读事件。 如果有异常,则把异常写到异常日志中。

  3. Start方法:启动服务。首先调用waitAccept等待连接,然后调用连接器管理对象中的OpenTimers方法启动连接器回收掉线的连接器,最后调用采用线程方式启动mIoServer.run。

     

2.3.2. 连接器类

功能描述:

  1. DoRead:投递异步读事件等待数据接收,如果读到客户端关闭的错误代码,则关闭连接。

  2. ReadHandle:异步读事件回调函数,当读取到数据后,通过业务接口把数据传递到业务处理对象。再次调用DoRead投递读事件

  3. DoWrite:投递异步写事件,由于什么时候在连接器上面进行写操作是由客户端或者应用协议决定的,投递写事件由业务逻辑接口调用。

  4. writeHandle:异步写回调函数,通过该回调函数将写结果通知给业务层。

2.3.3. 连接器管理类

功能描述:

  1. AddConnector方法:为连接器生成连接器ID,并把连接器对象保存到连接器容器中。工作流程:

    首先中连接器ID回收站中获取可以利用的ID,如果没有,则通过增长方式建立一个ID。

  2. 其次调用容器的插入方法保存连接器对象。

  3. GetSocket方法:通过连接器ID或者客户端ID获取连接器。

  4. UpdataConnector方法:当客户端注册后,更新连接器的ClientId。由业务接口调用。

  5. checkConnection方法:定时检查连接器容器中是否有掉线的连接器,并将掉线的连接器中删除掉线的连接器。

   源代码:


/***********************************************

* 撰写时间:2016年8月12日

*C++等级:初级

*作者:

*/

#ifndef TCPSERVER_H

#define TCPSERVER_H

#include <boost/asio.hpp>

#include <boost/bind.hpp>

#include <boost/thread.hpp>

#include "connector.h"

#include "connectormanager.h"

#include "userbusiness.h"

#include <string>

using namespacestd;

class TTcpServer

{

   public:

    //提供默认的构造函数

    TTcpServer();

    //提供通过IP和端口构造服务类的构造函数

    TTcpServer(conststringipAddress,shortport);

   //~TTcpServer(){}

    voidSetIpAddress(conststringipAddr,shortport);

    voidStart(constboolmultiThread=false);

    //结束服务

    voidStop();

    //注册业务

    voidRegisterBussiness(IBussiness&business);

    voidSetRecycleSocketTime(intsecond);

  private:

    boost::asio::io_servicemIoServer;

    boost::asio::ip::tcp::endpointmEndpoint;

    boost::asio::ip::tcp::acceptormAcceptor;

    TConnectorManagermConnectorManager;

    //线程组,保存的是线程地址,本例不适应,学习参考

    boost::thread_groupmWorkers;

    //线程组,用智能指针保存,自动化管理

    std::vector<boost::shared_ptr<boost::thread>>mThreads;

    //业务接口

    IBussiness*pmBussiness;

    intmRecycleTime;

    //连接处理器

    voidacceptHandle(boost::shared_ptr<TConnector>pConnection,constboost::system::error_code&error);

    //监听端口,等待连接

    voidwaitAccept();

    //创建线程池

     voidopenThreadPool();

};

#endif //TCPSERVER_H



#include"boost/enable_shared_from_this.hpp"

//#include <boost/thread.hpp>

#include <boost/thread/thread.hpp>

#include "tcpserver.h"

#include "userbusiness.h"

TTcpServer::TTcpServer()

    :mAcceptor(mIoServer),

     mConnectorManager(mIoServer),

     mRecycleTime(60)

{

 

}

TTcpServer::TTcpServer(conststringipAddress,shortport)

    :mEndpoint(boost::asio::ip::address_v4::from_string(ipAddress),port),

     mAcceptor(mIoServer,mEndpoint),mConnectorManager(mIoServer)

{

}

//等待连接

void TTcpServer::waitAccept()

{

    //构造一个虚拟的连接对象

    boost::shared_ptr<TConnector>pConnection(newTConnector(mIoServer,pmBussiness));

    //等待客户端连接,连接成功将连接信息保存到pConnection中

    mAcceptor.async_accept(pConnection->GetSocket(),

    boost::bind(&TTcpServer::acceptHandle,this,pConnection,boost::asio::placeholders::error()));

}

//连接事件回调函数

void TTcpServer::acceptHandle(boost::shared_ptr<TConnector>pConnection,constboost::system::error_code&err)

{

    if(!err)

    {

       try

        {

            //再次监听连接

            waitAccept();

            //投递读准备

           if(NULL!=pConnection)

           {

               //pConnection->InitSocket();

                pConnection->DoRead();

                //本次连接保存到连接器容器中

                mConnectorManager.AddConnector(pConnection);

           }

        }

        catch(std::exception&e)

        {

            //写异常日志

        }

        catch(...)

        {

            //写异常日志

        }

    }

}


//使用默认的构造函数创建TcpServer时,还没有制定服务器的IP地址和端口,需要该函数设置服务程序运行的IP和端口

void TTcpServer::SetIpAddress(conststringipAddr,shortport)

{

    boost::asio::ip::tcp::endpointtmpEp(boost::asio::ip::address_v4::from_string(ipAddr),port);

    mEndpoint=tmpEp;

    mAcceptor.open(mEndpoint.protocol());

    mAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));

    mAcceptor.bind(tmpEp);

    mAcceptor.listen();

}

void TTcpServer::Start(constboolmultiThread)

{

    waitAccept();

    if(multiThread)//启动多线程

    {

       openThreadPool();

    }

    else

    {

       //以非阻塞、单线程方式运行服务

        boost::shared_ptr<boost::thread>aThread(newboost::thread(boost::bind(&boost::asio::io_service::run,&mIoServer)));

        mThreads.push_back(aThread);

     }

    mConnectorManager.OpenTimer(mRecycleTime);

}

void TTcpServer::RegisterBussiness(IBussiness&business)

{

    business.BindConnectorManager(mConnectorManager);

    pmBussiness=&business;

}


void TTcpServer::openThreadPool()

{

   size_tcpuCount=boost::thread::hardware_concurrency();//获取cpu核数

   for(size_ti=0;i<(cpuCount*2-1);i++)

   {

        boost::shared_ptr<boost::thread>aThread(newboost::thread(boost::bind(&boost::asio::io_service::run,&mIoServer)));

        mThreads.push_back(aThread);

   }

// //阻塞线程,同时会阻塞主线程

// for(std::size_ti=0;i<mThreads.size();++i)

// {

// mThreads[i]->join();

// }

}

void TTcpServer::Stop()

{

    mConnectorManager.CloseAll();

   mAcceptor.close();

    mIoServer.stop();

}


/*
*************************************************************
 * 连接器,也称为会话,是整个通讯服务的核心,数据读写操作均围绕连接器进行
 * 由于ASIO抽象了操作系统,连接器并没有唯一的句柄,构造一个Long类型的数据
 * 标识连接器,维护句柄由管理连接器类实现。
 ********************************************************
*/
#ifndef CONNECTION_H
#define CONNECTION_H
#include "boost/enable_shared_from_this.hpp"
#include <boost/asio.hpp>
#include <boost/detail/atomic_count.hpp>
#include "bussinessinterface.h"
using namespace std;
const int MaxLen=1024;
//连接器,每个客户端连接到服务器时都会产生一个连接,所有的读写操作基都是在一个连接上产生的。
//使用shared_ptr和enable_shared_from_this智能指针管理内存,防止内存泄漏.
class TConnector: public boost::enable_shared_from_this<TConnector>
{
public:
    //客户端ID
    std::string ClientId;
    //连接器唯一标识
    long ConnectorId;
    TConnector(boost::asio::io_service &ioSer, IBussiness* bussiness);
    boost::asio::ip::tcp::socket& GetSocket();
    void InitSocket();
    //投递读数据事件
    void DoRead();
    void DoWrite(const char *pData, UINT32 nDataSize);//
private:
    boost::asio::io_service& mIoService;
    //连接产生的Socket
    boost::asio::ip::tcp::socket mSocket;
    //客户端Ip地址和端口号
    boost::asio::ip::tcp::endpoint mClientAddress;
    //业务逻辑接口
    IBussiness* pmBussiness;
    //接收缓冲区
    char mDataBuffer[MaxLen];
    //最近读取数据时间,可以做为掉线判断依据,由应用层协议决定,如心跳包
    //boost::posix_time::seconds mReadLastTime;
    //异步读事件回调处理
    void readHandle(const boost::system::error_code& ec, size_t bytesTransferred);
    void  writeHandle(const boost::system::error_code& ec);
    void closeSocket();
    void closeSocketHandle();
};
#endif // CONNECTION_H
#include "connector.h"
#include "boost/asio.hpp"
#include "boost/bind.hpp"
#include "boost/enable_shared_from_this.hpp"
TConnector::TConnector(boost::asio::io_service &ioSer, IBussiness* bussiness)
  :mIoService(ioSer),mSocket(ioSer),pmBussiness(bussiness)
{
}
void TConnector::InitSocket()
{
    mSocket.set_option(boost::asio::ip::tcp::acceptor::linger(true, 0));
    mSocket.set_option(boost::asio::socket_base::keep_alive(true));
    //mSocket.set_option(boost::asio::socket_base::);
}
boost::asio::ip::tcp::socket& TConnector::GetSocket()
{
   return mSocket;
}
void TConnector::DoRead()
{
    mSocket.async_read_some(boost::asio::buffer(mDataBuffer,MaxLen),
    boost::bind(&TConnector::readHandle,shared_from_this(),
    boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));
}
void TConnector::readHandle(const boost::system::error_code &ec, size_t bytesTransferred)
{
    if (!ec)
    {
        if (pmBussiness != NULL)
        {
            pmBussiness->OnRecvData(shared_from_this(), mDataBuffer, bytesTransferred);
        }
       // mReadLastTime=boost::posix_time::second_clock::local_time();
        DoRead();
    }else if(ec==boost::asio::error::eof || ec==boost::asio::error::connection_reset)
    {
        closeSocket();
    }
}
void TConnector::DoWrite(const char* pData, UINT32 nDataSize)
{
   boost::asio::async_write(mSocket,boost::asio::buffer(pData,nDataSize),
    boost::bind(&TConnector::writeHandle,shared_from_this(),
    boost::asio::placeholders::error));
}
void TConnector::writeHandle(const boost::system::error_code &ec)
{
    if (!ec)
    {
       if (pmBussiness != NULL)
       {
          //pmBusiness->OnWrite(this, m_dataRecvBuff, bytes_transferred);
       }
    }
}
void TConnector::closeSocket()
{
    mIoService.post(boost::bind(&TConnector::closeSocketHandle, shared_from_this()));
}
void TConnector::closeSocketHandle()
{
    try
    {
        mSocket.close();
        if (pmBussiness != NULL)
        {
            pmBussiness->OnDisConnection();
        }
    }
    catch(std::exception& e)
    {
        //写异常日志
    }
    catch(...)
    {
        //写异常日志
    }
}
/*连接器管理类
 **********************************************
 * 保存连接器、删除掉线的连接器、更新连接器中的客户端ID、获取特定的连接器
 * 撰写时间:2016年8月16日
 * 作者:
 */
#ifndef CONNECTORMANAGER_H
#define CONNECTORMANAGER_H
#include "boost/enable_shared_from_this.hpp"
#include <boost/asio.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/typeof/typeof.hpp>
#include <boost/thread.hpp>
#include "connector.h"
struct SOCKETID{};//多容器SocketId索引标识
struct CLIENTID{};//多容ClientId器索引标识
typedef
 boost::multi_index::multi_index_container
 <
         boost::shared_ptr<TConnector>,
         boost::multi_index::indexed_by
         <
             boost::multi_index::ordered_unique
             <boost::multi_index::tag<SOCKETID>,BOOST_MULTI_INDEX_MEMBER(TConnector, long,ConnectorId)>,
             boost::multi_index::ordered_non_unique
             <boost::multi_index::tag<CLIENTID>,BOOST_MULTI_INDEX_MEMBER(TConnector,string,ClientId)>
         >
 > ConnectorSet;
 //实现参考模板
 template<typename Tag, typename MultiIndexContainer, typename Index>
 MultiIndexContainer FindContainer(const MultiIndexContainer& container, const Index& index)
 {
    const typename boost::multi_index::index<MultiIndexContainer, Tag>::type& indexSet = get<Tag>(container);
    const typename boost::multi_index::index<MultiIndexContainer, Tag>::type::iterator iter = indexSet.find(index);
     if (indexSet.end() == iter)
    {
        index.print("not found");
        return NULL;
    }
    return iter;
}
//连接器管理类
class TConnectorManager
{
public:
    TConnectorManager(boost::asio::io_service &io)
        :mTimer(io,boost::posix_time::seconds(30))
    {}
    void AddConnector(boost::shared_ptr<TConnector> pConnection);
    //由ConnectorId或者ClientId获取连接器
    template<typename Tag,typename Member>
    boost::shared_ptr<TConnector> GetConnetor(Member& m)
    {
       if (mConnectorSet.empty())
       {
           return NULL;
       }
       const typename boost::multi_index::index<ConnectorSet,Tag>::type& tagView = boost::multi_index::get<Tag>(mConnectorSet);
       const typename boost::multi_index::index<ConnectorSet,Tag>::type::iterator iter = tagView.find(m);
        if (tagView.end()!=iter)
        {
            return *iter;
        }else
        {return NULL;}
    }
//    template<typename Tag,typename TagValue>
//    void DeleteConnector(TagValue v)
//    {
//       boost::shared_ptr<TConnector> ac =GetConnetor(v);
//       if (NULL !=ac)
//       mConnectorSet.erase(ac);
//    }
    template<typename Tag,typename IndexValue>
    void UpdataConnection(IndexValue& idv,string clientId)
    {
      boost::shared_ptr<TConnector> acon =GetConnetor<Tag>(idv);
      if (!acon==NULL)
      {
           const_cast<TConnector&>(*acon).ClientId =clientId;
      }
    }
    void OpenTimer(long times);
    void CloseAll();
private:
    typedef boost::shared_lock<boost::mutex> readLock;
    typedef boost::unique_lock<boost::mutex> writeLock;
    static boost::detail::atomic_count mLastId;
    ConnectorSet mConnectorSet;
    //回收掉线的连接器ID.
    std::vector<long> mRecycleId;
    boost::mutex mMutex;
    std::vector<boost::shared_ptr<boost::thread>> mThreads;
    boost::asio::io_service mIoService;
    boost::asio::deadline_timer mTimer;
    long mTimeLength;
    //定时检查连接器,删除已经掉线的连接器
    void checkConnector(const boost::system::error_code &e);
};
#endif // CONNECTORMANAGER_H
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/date_time.hpp>
#include "connectormanager.h"
boost::detail::atomic_count TConnectorManager::mLastId(0);
//如果mRecycleId回收站中有可使用的Id,就取第一个Id作为新连接器的ConnectorId,如果没有产生一个Id
void TConnectorManager::AddConnector(boost::shared_ptr<TConnector> pConnection)
{
   writeLock lock(mMutex);
   long tmpId ;
   if(mRecycleId.empty())
   {
         tmpId = ++TConnectorManager::mLastId;
   }else
   {
       //取第一个ID
       tmpId =mRecycleId[0];
       mRecycleId.erase(mRecycleId.begin()+0);
   }
   pConnection->ConnectorId =tmpId;
   mConnectorSet.insert(pConnection);
}
void TConnectorManager::OpenTimer(long times)
{
    mTimeLength=times;
    mTimer.async_wait(boost::bind(&TConnectorManager::checkConnector,this,boost::asio::placeholders::error));
}
//定时检查连接器容器,清除掉线的连接器,并回收ID。否则回收ID会导致IoServer::run退出
void TConnectorManager::checkConnector(const boost::system::error_code& e)
{
    writeLock Lock(mMutex);
  try
  {
      if (!e)
      {
          ConnectorSet::iterator iter = mConnectorSet.begin();
          while(iter != mConnectorSet.end())
          {
              if (mConnectorSet.size()<2) break;
              if(!((*iter)->GetSocket().is_open()))
              {
                  mRecycleId.push_back((*iter)->ConnectorId);
                  iter =mConnectorSet.erase(iter);
              }
              ++iter;
          }
        } //end if
  }//end try
  catch(std::exception& e)
  {
       // e.what()
  }
  catch(...)
  {
  }
    mTimer.expires_at(mTimer.expires_at()+ boost::posix_time::seconds(mTimeLength));
    mTimer.async_wait(boost::bind(&TConnectorManager::checkConnector,this,boost::asio::placeholders::error));
}
void TConnectorManager::CloseAll()
{
    mIoService.stop();
}
/*
******************************************************************************
*   业务接口。 业务接口主要是为了实现通讯基础设施与业务逻辑的解耦而存在。业务逻辑主要是
*   数据解析、入库保存、数据转发等用户功能需求。
******************************************************************************
*/
#ifndef BUSSINESSINTERFACE_H
#define BUSSINESSINTERFACE_H
class TConnector;
typedef boost::shared_ptr<TConnector> ConnectorPtr;
class TConnectorManager;
//业务接口,主要是协议数据处理或者其他用户需要处理的业务应用
class IBussiness
{
    public:
        virtual void OnNewConnection(ConnectorPtr newSocket) = 0;
        virtual void OnRecvData(ConnectorPtr socket, const char* pData, UINT32 nDataSize) = 0;
        virtual void OnWrite(const char* pData, UINT32 nDataSize)=0;
        virtual void OnDisConnection()=0;
        void BindConnectorManager(TConnectorManager &cnnManager){pmConnectorManager=&cnnManager;}
    protected:
        TConnectorManager *pmConnectorManager;
};
#endif // BUSSINESSINTERFACE_H
/*测试业务用例
 * ********************************************************************
 * ********************************************************************
*/
#ifndef USERBUSINESS_H
#define USERBUSINESS_H
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include "bussinessinterface.h"
#include "connectormanager.h"
#include "connector.h"
class TDefaultBussiness:public IBussiness
{
    public:
        void OnNewConnection(ConnectorPtr newSocket) ;
        void OnRecvData(ConnectorPtr socket, const char* pData, UINT32 nDataSize) ;
        void OnWrite(const char* pData, UINT32 nDataSize);
};
#endif // USERBUSINESS_H
/*
 *  业务处理程序例子
 *
 */
#include <boost/enable_shared_from_this.hpp>
#include "userbusiness.h"
#include "bussinessinterface.h"
using namespace std;
void TDefaultBussiness::OnNewConnection(ConnectorPtr newSocket)
{
}
void TDefaultBussiness::OnRecvData(ConnectorPtr socket, const char *pData, UINT32 nDataSize)
{
     long  socketId =socket->ConnectorId;
     //string cid=socket->mClientId;
     stringstream ss;
     ss<<socketId;
     string rs ="From ID:" +ss.str();
     socket->DoWrite(rs.c_str(),rs.length());
     //OnWrite("123456789012345",15);
}
void TDefaultBussiness::OnWrite(const char *pData, UINT32 nDataSize)
{
    //获取应答数据包,从应答数据包中获取SocketId或者ClientId。
    int indx =1;
    ConnectorPtr iter= pmConnectorManager->GetConnetor<SOCKETID>(indx);
    iter->DoWrite(pData,15);
}
void TDefaultBussiness::OnDisConnection()
{
}
使用举例
#ifndef MAINWINDOW_H
#define MAINWINDOW_H
#include <QMainWindow>
namespace Ui {
class MainWindow;
}
class MainWindow : public QMainWindow
{
    Q_OBJECT
public:
    explicit MainWindow(QWidget *parent = 0);
    ~MainWindow();
private slots:
    void on_pushButton_clicked();
    void on_btnStop_clicked();
private:
    Ui::MainWindow *ui;
};
#endif // MAINWINDOW_H
#include "mainwindow.h"
#include "ui_mainwindow.h"
#include "tcpserver.h"
#include "userbusiness.h"
TDefaultBussiness myBussiness;
//TTcpServer TcpServer("127.0.0.1",6868);
TTcpServer TcpServer;
MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);
}
MainWindow::~MainWindow()
{
    delete ui;
}
void MainWindow::on_pushButton_clicked()
{
    TcpServer.SetIpAddress("127.0.0.1",6868);
    TcpServer.RegisterBussiness(myBussiness);
    TcpServer.Start();
}
void MainWindow::on_btnStop_clicked()
{
    TcpServer.Stop();
}