boost.asio系列——io_service

时间:2022-09-09 12:33:34

IO模型

io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。

    asio::io_service io_service;
    asio::ip::tcp::socket socket(io_service);

在asio框架中,同步的io主要流程如下:

    boost.asio系列——io_service

  1. 应用程序调用IO对象成员函数执行IO操作
  2. IO对象向io_service 提出请求.
  3. io_service 调用操作系统的功能执行连接操作.
  4. 操作系统向io_service 返回执行结果.
  5. io_service将错误的操作结果翻译为boost::system::error_code类型,再传递给IO对象.
  6. 如果操作失败,IO对象抛出boost::system::system_error类型的异常.

而异步IO的处理流程则有些不同:

    boost.asio系列——io_service

  1. 应用程序调用IO对象成员函数执行IO操作
  2. IO对象请求io_service的服务
  3. io_service 通知操作系统其需要开始一个异步连接.
  4. 操作系统指示连接操作完成, io_service从队列中获取操作结果
  5. 应用程序必须调用io_service::run()以便于接收结果
  6. 调用io_service::run()后,io_service返回一个操作结果,并将其翻译为error_code,传递到事件回调函数中

io_service对象

io_servuce的作用: io_servie 实现了一个任务队列,这里的任务就是void(void)的函数。Io_servie最常用的两个接口是post和run,post向任务队列中投递任务,run是执行队列中的任务,直到全部执行完毕,并且run可以被N个线程调用。Io_service是完全线程安全的队列。

io_service对象提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop:

  1. post用于发布io事件,如timer,socket读写等,一般由asio框架相应对象调用,无需我们显式调用。
  2. run用于监听io事件响应,并执行响应回调,对于异步io操作需要在代码中显式调用,对于同步io操作则由io对象隐式调用(并不是run函数,不过也是等待io事件)。

可见,io_service提供的是一个生产者消费者模型。在异步io操作中需要我们手动控制消费者,调用run函数,它的基本工作模式如下:

  1. 等待io事件响应,如果所有io事件响应完成则退出
  2. 等待到io事件响应后,执行其对应的回调
  3. 继续等待下一个io事件,重复1-2

Io_servie 实现代码的基本类结构:

l Io_servie是接口类,为实现跨平台,采用了策略模式,所有接口均有impl_type实现。根据平台不同impl_type分为

n win_iocp_io_service Win版本的实现,这里主要分析Linux版本。

n task_io_service 非win平台下的实现,其代码结构为:

u detail/task_io_service_fwd.hpp 简单声明task_io_service名称

u detail/task_io_service.hpp 声明task_io_service的方法和属性

u detail/impl/task_io_service.ipp 具体实现文件

u 队列中的任务类型为opertioan,原型其实是typedef task_io_service_operation operation,其实现文件在detail/task_io_service_operation.hpp中,当队列中的任务被执行时,就是task_io_service_operation:: complete被调用的时候。

Io_servie::Post方法的实现

Post向队列中投递任务,然后激活空闲线程执行任务。其实现流程如下:

l Post接收handler作为参数,实际上是个仿函数,通过此仿函数构造出completion_handler对象,completion_handler继承自operation。然后调用post_immediate_completion。

l post_immediate_completion首先将outstanding_work_增加,然后调用post_deferred_completion。

l post_deferred_completion首先加锁将任务入列,然后调用wake_one_thread_and_unlock

l wake_one_thread_and_unlock尝试唤醒当前空闲的线程,其实现中特别之处在于,若没有空闲线程,但是有线程在执行task->run,即阻塞在epoll_wait上,那么先中断epoll_wait执行任务队列完成后再执行epoll_wait。

l first_idle_thread_维护了所有当前空闲线程,实际上使用了Leader/Follower模式,每次唤醒时只唤醒空闲线程的第一个。

Io_servie::run方法的实现

Run方法执行队列中的所有任务,直到任务执行完毕。

l run方法首先构造一个idle_thread_info,和first_idle_thread_类型相同,即通过first_idle_thread_将所有线程串联起来,它这个串联不是立即串联的,当该线程无任务可做是加入到first_idle_thread_的首部,有任务执行时,从first_idle_thread_中断开。这很正常,因为first_idle_thread_维护的是当前空闲线程。

l 加锁,循环执行do_one方法,直到do_one返回false

l do_one每次执行一个任务。首先检查队列是否为空,若空将此线程追加到first_idle_thread_的首部,然后阻塞在条件变量上,直到被唤醒。

l 当被唤醒或是首次执行,若stopped_为true(即此时stop方法被调用了),返回0

l 队列非空,pop出一个任务,检查队列无任务那么简单的解锁,若仍有,调用wake_one_thread_and_unlock尝试唤醒其他空闲线程执行。然后执行该任务,返回1.

l 实际上在执行队列任务时有一个特别的判断if (o == &task_operation_),那么将会执行task_->run,task_变量类型为reactor,在linux平台实现为epoll_reactor,实现代码文件为detail/impl/epoll_reactor.ipp,run方法实际上执行的是epoll_wait,run阻塞在epoll_wait上等待事件到来,并且处理完事件后将需要回调的函数push到io_servie的任务队列中,虽然epoll_wait是阻塞的,但是它提供了interrupt函数,该interrupt是如何实现的呢,它向epoll_wait添加一个文件描述符,该文件描述符中有8个字节可读,这个文件描述符是专用于中断epoll_wait的,他被封装到select_interrupter中,select_interrupter实际上实现是eventfd_select_interrupter,在构造的时候通过pipe系统调用创建两个文件描述符,然后预先通过write_fd写8个字节,这8个字节一直保留。在添加到epoll_wait中采用EPOLLET水平触发,这样,只要select_interrupter的读文件描述符添加到epoll_wait中,立即中断epoll_wait。很是巧妙。!!!实际上就是因为有了这个reactor,它才叫io_servie,否则就是一个纯的任务队列了。

l Run方法的原则是:

n 有任务立即执行任务,尽量使所有的线程一起执行任务

n 若没有任务,阻塞在epoll_wait上等待io事件

n 若有新任务到来,并且没有空闲线程,那么先中断epoll_wait,先执行任务

n 若队列中有任务,并且也需要epoll_wait监听事件,那么非阻塞调用epoll_wait(timeout字段设置为0),待任务执行完毕在阻塞在epoll_wait上。

n 几乎对线程的使用上达到了极致。

n 从这个函数中可以知道,在使用ASIO时,io_servie应该尽量多,这样可以使其epoll_wait占用的时间片最多,这样可以最大限度的响应IO事件,降低响应时延。但是每个io_servie::run占用一个线程,所以io_servie最佳应该和CPU的核数相同。

Io_servie::stop的实现

l 加锁,调用stop_all_threads

l 设置stopped_变量为true,遍历所有的空闲线程,依次唤醒

l task_interrupted_设置为true,调用task_的interrupt方法

l task_的类型为reactor,在run方法中已经做了分析

 

从中可以看出,io_service是一个工作队列的模型。在使用过程中一般有如下几个需要注意的地方:

1. run函数在io事件完成后会退出,导致后续基于该对象的异步io任务无法执行

由于io_service并不会主动常见调度线程,需要我们手动分配,常见的方式是给其分配一个线程,然后执行run函数。但run函数在io事件完成后会退出,线程会终止,后续基于该对象的异步io任务无法得到调度。

解决这个问题的方法是通过一个asio::io_service::work对象来守护io_service。这样,即使所有io任务都执行完成,也不会退出,继续等待新的io任务。

    boost::asio::io_service io;
    boost::asio::io_service::work work(io);
    io.run();

2. 回调在run函数的线程中同步执行,当回调处理时间较长时阻塞后续io响应

解决这个问题的方法有两种:1. 启动多线程执行run函数(run函数是线程安全的),2. 新启动一个线程(或通过线程池)来执行回调函数。一般来讲,如果回调处理事件不是特别短,应该使用在线程池中处理回调的方式。

3. 回调在run函数的线程中同步执行,io事件较多的时候得不到及时响应

这个其实是性能问题了,在多核cpu上可以通过在多个线程中执行run函数来解决这一问题。这种方式也只能充分利用cpu性能,本身性能问题就不是光靠软件就能解决的。

.net中的异步io调度方式

和io_service这种手动控制的方式比起来,.net则是纯粹的自动档了。IO调度由CLR托管了,无需手动控制。回调也是在线程池中执行,无需担心影响后续IO响应。

正是由于CLR的托管,在.net 的异步IO框架中,就没有类似io_service的调度对象存在,这也符合.net的一贯简洁做法。

 

◆boost::asio::io_service使用时的注意事项:

①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。

②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用

③一般情况下,io_service的成员函数的使用顺序:

boost::asio::io_service构造,
boost::asio::io_service::run(),
boost::asio::io_service::stop(),
boost::asio::io_service::reset(),
boost::asio::io_service::run(),
......
boost::asio::io_service析构,

④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。
⑤一个新创建的io_service不需要执行reset()函数。
⑥在调用stop()后,在调用run()之前,请先调用reset()函数。
⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。
我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。

⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。

⑨boost的.hpp文件里面(一般情况下)有各个函数的使用说明,你可以随时查看。

◆下面是boost::asio::io_service的stop()和reset()函数的注释的翻译:

void boost::asio::io_service::stop();
BOOST_ASIO_DECL void stop();
/// Stop the io_service object's event processing loop.
/// 停止io_service对象的事件处理循环。
/**
 * This function does not block, but instead simply signals the io_service to
 * stop. All invocations of its run() or run_one() member functions should
 * return as soon as possible. Subsequent calls to run(), run_one(), poll()
 * or poll_one() will return immediately until reset() is called.
 */
 /**
 这个函数不阻塞,而是仅仅表示io_service停止了。
 它的run()或run_one()成员函数的调用应当尽快返回。
 对run()、run_one()、poll()、poll_one()的随后的调用将会立即返回直到reset()函数被调用了。
 */
void boost::asio::io_service::reset();
BOOST_ASIO_DECL void reset();
/// Reset the io_service in preparation for a subsequent run() invocation.
/// 重置io_service对象,为随后的run()调用做准备。
/**
 * This function must be called prior to any second or later set of
 * invocations of the run(), run_one(), poll() or poll_one() functions when a
 * previous invocation of these functions returned due to the io_service
 * being stopped or running out of work. After a call to reset(), the
 * io_service object's stopped() function will return @c false.
 *
 * This function must not be called while there are any unfinished calls to
 * the run(), run_one(), poll() or poll_one() functions.
 */
 /**
 io_service被停止,或者执行完handler而缺乏工作时,run()、run_one()、poll()、poll_one()函数的调用会被返回。
 这些函数在被调用之前,必须先调用reset函数。
 在reset函数被调用后,io_service对象的stopped函数将会返回false。
 当run()、run_one()、poll()、poll_one()函数的任何的调用未结束时,这个函数一定不能被调用。
 */

◆对stop()和reset()函数的一点说明(是我单步调试时看到的):

在Windows下,boost::asio::io_service类里面有一个数据成员为"stopped_"(Flag to indicate whether the event loop has been stopped.)。它是一个标志,它标志着事件循环是不是被stopped了。而boost::asio::io_service::reset()函数仅仅是赋值"stopped_=0"。boost::asio::io_service::stopped()函数仅仅是判断"0!=stopped_"的真假。你单步调试一下,就什么都知道了。

◆下面是我验证boost::asio::io_service的一个例子:

 1 #include <boost/asio.hpp>  
 2 #include <boost/thread.hpp>  
 3 #include <boost/atomic.hpp>  
 4 #include <boost/shared_ptr.hpp>  
 5 #include <boost/date_time/posix_time/ptime.hpp>  
 6 #include <boost/date_time.hpp>//boost::posix_time::to_iso_extended_string()需要此头文件。  
 7   
 8 //boost::atomic_bool coutFlag = false;  
 9 //error C2440: 'initializing' : cannot convert from 'bool' to 'boost::atomics::atomic<bool>'  
10 //故意写错,可以根据错误信息知道某类型的详细信息。  
11 boost::atomic_bool g_coutFlag(false);  
12 boost::atomic_int g_numIn(0);  
13 boost::atomic_int g_numOut(0);  
14   
15 boost::thread_group g_thgp;  
16 boost::asio::io_service g_io;  
17 boost::shared_ptr<boost::asio::io_service::work> g_pWork = \  
18 boost::shared_ptr<boost::asio::io_service::work>(new boost::asio::io_service::work(g_io));  
19 boost::asio::io_service::strand g_strand(g_io);  
20 std::vector<boost::posix_time::ptime> g_vecTimes;  
21   
22 void my_run_4_io_service(boost::asio::io_service& _io, int _idx)  
23 {  
24     _io.run();  
25     //想得到boost::asio::io_service::run()退出时的时刻,只能对io_service进行封装了。  
26     g_vecTimes[_idx] = boost::posix_time::microsec_clock::local_time();  
27 }  
28   
29 void outFun(int idx)  
30 {// io_service执行的handler。  
31     ++g_numOut;  
32     if (g_coutFlag.load())  
33         std::cout << "outFun: index=" << idx << std::endl;  
34     boost::this_thread::sleep_for(boost::chrono::milliseconds(500));  
35 }  
36   
37 void inFun()  
38 {  
39     for (int i = 1; i <= 10; ++i)  
40     {  
41         g_strand.post(boost::bind(outFun, i));  
42         ++g_numIn;  
43         boost::this_thread::sleep_for(boost::chrono::milliseconds(100));  
44     }  
45     g_coutFlag = true;  
46     g_io.stop();//调用它后,不论io_service有没有使用io_service::work类,各个线程的run()都会立即返回。  
47     g_vecTimes[0] = boost::posix_time::microsec_clock::local_time();  
48     int numDelta = g_numIn - g_numOut;  
49     std::cout << "inFun: numDelta=" << numDelta << std::endl;//还剩多少event没有被执行。  
50 }  
51   
52 int main()  
53 {  
54     int vecNum = 5;  
55     g_vecTimes.reserve(vecNum); g_vecTimes.resize(vecNum);  
56     //一个容纳 void fun(int i) 函数的 function对象。  
57     boost::function<void(int)> my_lambda_function_object = [vecNum](int secs)  
58     {  
59         boost::this_thread::sleep_for(boost::chrono::microseconds(1000 * 1000 * secs));  
60         std::cout << "now, time is " << boost::posix_time::  
61             to_iso_extended_string(boost::posix_time::microsec_clock::local_time()) << std::endl;  
62         for (int i = 0; i < vecNum; ++i)  
63             std::cout << i << " : " << boost::posix_time::to_iso_extended_string(g_vecTimes[i]) << std::endl;  
64     };  
65   
66     for (int i = 1; i < vecNum; ++i)  
67         g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i));  
68     g_thgp.create_thread(inFun);  
69     //等待5秒,确保执行完毕我设计的那些操作。  
70     my_lambda_function_object(5);  
71     //析构掉io_service对应的io_service::work对象,此时io_service里面还有event。  
72     g_pWork = nullptr;  
73     boost::this_thread::sleep_for(boost::chrono::milliseconds(1000 * 1));  
74     g_io.reset();  
75     boost::this_thread::sleep_for(boost::chrono::seconds(1));  
76     //因为work被析构掉了,所以启动的那些线程在执行完event后,都自行退出了。  
77     for (int i = 1; i < vecNum; ++i)  
78         g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i));  
79     //等待6秒,确保io_service中剩余的event被执行完毕。  
80     my_lambda_function_object(6);  
81     std::cout << "done." << std::endl;  
82     int cmd_val = getchar();  
83     return 0;  
84 }