boost.asio源码剖析(三)

时间:2022-09-09 11:42:14

* 常见流程分析之一(Tcp异步连接)

 

我们用一个简单的demo分析Tcp异步连接的流程:

 1 #include <iostream>
2 #include <boost/asio.hpp>
3
4 // 异步连接回调函数
5 void on_connect(boost::system::error_code ec)
6 {
7 if (ec) // 连接失败, 输出错误码
8 std::cout << "async connect error:" << ec.message() << std::endl;
9 else // 连接成功
10 std::cout << "async connect ok!" << std::endl;
11 }
12
13 int main()
14 {
15 boost::asio::io_service ios; // 创建io_service对象
16 boost::asio::ip::tcp::endpoint addr(
17 boost::asio::ip::address::from_string("127.0.0.1"), 12345); // server端地址
18 boost::asio::ip::tcp::socket conn_socket(ios); // 创建tcp协议的socket对象
19 conn_socket.async_connect(addr, &on_connect); // 发起异步连接请求
20 ios.run(); // 调用io_service::run, 等待异步操作结果
21
22 std::cin.get();
23 return 0;
24 }

 


这段代码中的异步连接请求在asio源码中的序列图如下:
boost.asio源码剖析(三)

 

      其中,basic_socket是个模板类,tcp协议中的socket的定义如下:
            typedef basic_socket<tcp> socket;


      reactor的定义如下:
      #if defined(BOOST_ASIO_WINDOWS_RUNTIME)
            typedef class null_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_IOCP)
            typedef class select_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_EPOLL)
            typedef class epoll_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_KQUEUE)
            typedef class kqueue_reactor reactor;
      #elif defined(BOOST_ASIO_HAS_DEV_POLL)
            typedef class dev_poll_reactor reactor;
      #else
            typedef class select_reactor reactor;
      #endif

      在这个序列图中最值得注意的一点是:在windows平台下,异步连接请求不是由Iocp处理的,而是由select模型处理的,这是与异步读写数据最大的不同之处。

 


* 常见流程分析之二(Tcp异步接受连接)

 

我们用一个简单的demo分析Tcp异步连接的流程:

 1 #include <iostream>
2 #include <boost/asio.hpp>
3 #include <boost/bind.hpp>
4
5 // 异步连接回调函数
6 void on_accept(boost::system::error_code ec, boost::asio::ip::tcp::socket * socket_ptr)
7 {
8 if (ec) // 连接失败, 输出错误码
9 std::cout << "async accept error:" << ec.message() << std::endl;
10 else // 连接成功
11 std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
12
13 // 断开连接, 释放资源.
14 socket_ptr->close(), delete socket_ptr;
15 }
16
17 int main()
18 {
19 boost::asio::io_service ios; // 创建io_service对象
20 boost::asio::ip::tcp::endpoint addr(
21 boost::asio::ip::address::from_string("0.0.0.0"), 12345); // server端地址
22 boost::asio::ip::tcp::acceptor acceptor(ios, addr, false); // 创建acceptor对象
23 boost::asio::ip::tcp::socket * socket_ptr = new boost::asio::ip::tcp::socket(ios);
24 acceptor.async_accept(*socket_ptr
25 , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr)); // 调用异步accept请求
26 ios.run(); // 调用io_service::run, 等待异步操作结果
27
28 std::cin.get();
29 return 0;
30 }

 

这段代码中的异步连接请求在asio源码中的序列图如下:
boost.asio源码剖析(三)

 


* 常见流程分析之三(Tcp异步读写数据)

 

我们依然以上一节的例子为基础,扩展一个简单的demo分析Tcp异步读写数据的流程:

 1 #include <iostream>
2 #include <boost/asio.hpp>
3 #include <boost/bind.hpp>
4 #include <boost/shared_ptr.hpp>
5 #include <boost/array.hpp>
6
7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
8 typedef boost::array<char, 128> buffer_t;
9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;
10
11 // 异步读数据回调函数
12 void on_read(boost::system::error_code ec
13 , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
14 {
15 if (ec)
16 std::cout << "async write error:" << ec.message() << std::endl;
17 else
18 {
19 std::cout << "async read size:" << len;
20 std::cout << " info:" << std::string((char*)buffer_ptr->begin(), len) << std::endl;
21
22 // auto release socket and buffer.
23 }
24 }
25
26 // 异步写数据回调函数
27 void on_write(boost::system::error_code ec
28 , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
29 {
30 if (ec)
31 std::cout << "async write error:" << ec.message() << std::endl;
32 else
33 {
34 std::cout << "async write size:" << len << std::endl;
35 socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
36 , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
37 , socket_ptr, buffer_ptr));
38 }
39 }
40
41 // 异步连接回调函数
42 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
43 {
44 if (ec) // 连接失败, 输出错误码
45 {
46 std::cout << "async accept error:" << ec.message() << std::endl;
47 }
48 else // 连接成功
49 {
50 std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
51 buffer_ptr_t buffer_ptr(new buffer_t);
52 strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
53 socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
54 , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
55 , socket_ptr, buffer_ptr));
56 }
57 }
58
59 int main()
60 {
61 boost::asio::io_service ios; // 创建io_service对象
62 boost::asio::ip::tcp::endpoint addr(
63 boost::asio::ip::address::from_string("0.0.0.0"), 12345); // server端地址
64 boost::asio::ip::tcp::acceptor acceptor(ios, addr, false); // 创建acceptor对象
65 socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
66 acceptor.async_accept(*socket_ptr
67 , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr)); // 调用异步accept请求
68 ios.run(); // 调用io_service::run, 等待异步操作结果
69
70 std::cout << "press enter key...";
71 std::cin.get();
72 return 0;
73 }

 

这段代码中的异步连接请求在asio源码中的序列图如下:
boost.asio源码剖析(三)

 


* 常见流程分析之四(Tcp强制关闭连接)

我们依然以上一节的例子为基础,扩展一个简单的demo分析Tcp强制关闭连接的流程:

 1 #include <iostream>
2 #include <boost/asio.hpp>
3 #include <boost/bind.hpp>
4 #include <boost/shared_ptr.hpp>
5 #include <boost/array.hpp>
6
7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
8 typedef boost::array<char, 128> buffer_t;
9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;
10
11 // 异步读数据回调函数
12 void on_read(boost::system::error_code ec
13 , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
14 {
15 if (ec) // 连接失败, 输出错误码
16 {
17 std::cout << "async read error:" << ec.message() << std::endl;
18 }
19 }
20
21 // 异步写数据回调函数
22 void on_write(boost::system::error_code ec
23 , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
24 {
25 if (ec) // 连接失败, 输出错误码
26 {
27 std::cout << "async write error:" << ec.message() << std::endl;
28 }
29 }
30
31 // 异步连接回调函数
32 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
33 {
34 if (ec) // 连接失败, 输出错误码
35 {
36 std::cout << "async accept error:" << ec.message() << std::endl;
37 }
38 else // 连接成功
39 {
40 std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
41
42 {
43 buffer_ptr_t buffer_ptr(new buffer_t);
44 strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
45 socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
46 , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
47 , socket_ptr, buffer_ptr));
48 }
49
50 {
51 buffer_ptr_t buffer_ptr(new buffer_t);
52 socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
53 , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
54 , socket_ptr, buffer_ptr));
55 }
56
57 /// 强制关闭连接
58 socket_ptr->close(ec);
59 if (ec)
60 std::cout << "close error:" << ec.message() << std::endl;
61 }
62 }
63
64 int main()
65 {
66 boost::asio::io_service ios; // 创建io_service对象
67 boost::asio::ip::tcp::endpoint addr(
68 boost::asio::ip::address::from_string("0.0.0.0"), 12345); // server端地址
69 boost::asio::ip::tcp::acceptor acceptor(ios, addr, false); // 创建acceptor对象
70 socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
71 acceptor.async_accept(*socket_ptr
72 , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr)); // 调用异步accept请求
73 socket_ptr.reset();
74 ios.run(); // 调用io_service::run, 等待异步操作结果
75
76 std::cout << "press enter key...";
77 std::cin.get();
78 return 0;
79 }

这个例子中,接受到客户端的连接后,立即发起异步读请求和异步写请求,然后立即强制关闭socket。

其中,强制关闭socket的请求在asio源码中的序列图如下:
boost.asio源码剖析(三)

 

* 常见流程分析之五(Tcp优雅地关闭连接)

我们依然以第三节的例子为基础,扩展一个简单的demo分析Tcp优雅地关闭连接的流程:

 1 #include <iostream>
2 #include <boost/asio.hpp>
3 #include <boost/bind.hpp>
4 #include <boost/shared_ptr.hpp>
5 #include <boost/array.hpp>
6
7 typedef boost::shared_ptr<boost::asio::ip::tcp::socket> socket_ptr_t;
8 typedef boost::array<char, 32> buffer_t;
9 typedef boost::shared_ptr<buffer_t> buffer_ptr_t;
10
11
12 // 异步读数据回调函数
13 void on_read(boost::system::error_code ec
14 , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
15 {
16 static int si = 0;
17 if (ec) // 连接失败, 输出错误码
18 {
19 std::cout << "async read(" << si++ << ") error:" << ec.message() << std::endl;
20 socket_ptr->shutdown(boost::asio::socket_base::shutdown_receive, ec);
21 socket_ptr->close(ec);
22 if (ec)
23 std::cout << "close error:" << ec.message() << std::endl;
24 }
25 else
26 {
27 std::cout << "read(" << si++ << ") len:" << len << std::endl;
28
29 socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
30 , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
31 , socket_ptr, buffer_ptr));
32 }
33 }
34
35 // 异步写数据回调函数
36 void on_write(boost::system::error_code ec
37 , std::size_t len, socket_ptr_t socket_ptr, buffer_ptr_t buffer_ptr)
38 {
39 if (ec) // 连接失败, 输出错误码
40 {
41 std::cout << "async write error:" << ec.message() << std::endl;
42 }
43 else
44 {
45 /// 优雅地关闭连接
46 socket_ptr->shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
47 if (ec)
48 std::cout << "shutdown send error:" << ec.message() << std::endl;
49 }
50 }
51
52 // 异步连接回调函数
53 void on_accept(boost::system::error_code ec, socket_ptr_t socket_ptr)
54 {
55 if (ec) // 连接失败, 输出错误码
56 {
57 std::cout << "async accept error:" << ec.message() << std::endl;
58 }
59 else // 连接成功
60 {
61 std::cout << "async accept from (" << socket_ptr->remote_endpoint() << ")" << std::endl;
62
63 {
64 buffer_ptr_t buffer_ptr(new buffer_t);
65 socket_ptr->async_read_some(boost::asio::buffer(buffer_ptr.get(), buffer_t::size())
66 , boost::bind(&on_read, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
67 , socket_ptr, buffer_ptr));
68 }
69
70 {
71 buffer_ptr_t buffer_ptr(new buffer_t);
72 strcpy_s((char*)buffer_ptr->begin(), buffer_t::size(), "abcdefg");
73 socket_ptr->async_write_some(boost::asio::buffer(buffer_ptr.get(), strlen((char*)buffer_ptr->begin()))
74 , boost::bind(&on_write, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred
75 , socket_ptr, buffer_ptr));
76 }
77 }
78 }
79
80 int main()
81 {
82 boost::asio::io_service ios; // 创建io_service对象
83 boost::asio::ip::tcp::endpoint addr(
84 boost::asio::ip::address::from_string("0.0.0.0"), 12345); // server端地址
85 boost::asio::ip::tcp::acceptor acceptor(ios, addr, false); // 创建acceptor对象
86 socket_ptr_t socket_ptr(new boost::asio::ip::tcp::socket(ios));
87 acceptor.async_accept(*socket_ptr
88 , boost::bind(&on_accept, boost::asio::placeholders::error, socket_ptr)); // 调用异步accept请求
89 socket_ptr.reset();
90 ios.run(); // 调用io_service::run, 等待异步操作结果
91
92 std::cout << "press enter key...";
93 std::cin.get();
94 return 0;
95 }

 

      在这个例子中,接收到客户端的连接并向客户端发送数据以后,先关闭socket的发送通道,然后等待socket接收缓冲区中的数据全部read出来以后,再关闭socket的接收通道。此时,socket的接收和发送通道均以关闭,任何进程都无法使用此socket收发数据,但其所占用的系统资源并未释放,底层发送缓冲区中的数据也不保证已全部发出,需要在此之后执行close操作以便释放系统资源。
      若在释放系统资源前希望底层发送缓冲区中的数据依然可以发出,则需在socket的linger属性中设置一个等待时间,以便有时间等待发送缓冲区中的数据发送完毕。但linger中的值绝对不是越大越好,这是因为其原理是操作系统帮忙保留socket的资源以等待其发送缓冲区中的数据发送完毕,如果远端socket的一直未能接收数据便会导致本地socket一直等待下去,这对系统资源是极大的浪费。因此,在需要处理大量连接的服务端,linger的值一定不可过大。

 

 

由于本文会实时根据读者反馈的宝贵意见更新,为防其他读者看到过时的文章,因此本系列专题谢绝转载!