Boost.Asio:每个连接/套接字使用`io_service`是一件好事吗?

时间:2022-09-08 22:12:21

I want to create an application that implements one-thread-per-connection model. But each connection must be stoppable. I have tried this boost.asio example which implements the blocking version of what I want. But after a little bit questioning I've found out that there is no reliable way to stop the session of that example. So I've tried to implement my own. I had to use asynchronous functions. Since I want to make a thread to manage only one connection and there is no way to control which asynchronous job is employed to which thread, I decided to use io_service for each connection/socket/thread.

我想创建一个实现单线程每个连接模型的应用程序。但每个连接必须是可以停止的。我试过这个boost.asio示例,它实现了我想要的阻塞版本。但经过一番询问,我发现没有可靠的方法来阻止该例子的会话。所以我试图实现自己的。我不得不使用异步函数。由于我想创建一个只管理一个连接的线程,并且无法控制将哪个异步作业用于哪个线程,所以我决定对每个连接/套接字/线程使用io_service。

So is it a good approach, do you know a better approach?

那么这是一个好方法,你知道更好的方法吗?

My code is here so you can examine and review it:

我的代码在这里,所以你可以检查和审查它:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;
class Server;

// A connection has its own io_service and socket
class Connection {
protected:
    ba::io_service service;
    socket_type sock;
    b::thread *thread;
    ba::streambuf stream_buffer;    // for reading etc
    Server *server;
    void AsyncReadString() {
        ba::async_read_until(
            sock,
            stream_buffer,
            '\0',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::string newstr = s + '\0';  // add a null char
        ba::async_write(
            sock,
            ba::buffer(newstr.c_str(), newstr.size()),
            b::bind(&Connection::WriteHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    virtual void Session() {
        AsyncReadString();
        service.run();  // run at last
    }
    std::string ExtractString() {
        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }
    virtual void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    virtual void WriteHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
    }
public:
    Connection(Server *s) :
        service(),
        sock(service),
        server(s),
        thread(NULL)
    {  }
    socket_type& Socket() {
        return sock;
    }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Connection::Session, this));
    }
    void Join() {
        if (thread) thread->join();
    }
    void Stop() {
        service.stop();
    }
    void KillMe();
    virtual ~Connection() {
    }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<Connection*> Connections;
protected:
    ba::io_service service;
    acceptor_type acc;
    b::thread *thread;
    virtual void AcceptHandler(const bs::error_code &ec) {
        if (!ec) {
            Connections.back()->Start();
            Connections.push_back(new Connection(this));
            acc.async_accept(
                Connections.back()->Socket(),
                b::bind(&Server::AcceptHandler,
                    this,
                    ba::placeholders::error));
        }
        else {
            // do nothing
            // since the new session will be deleted
            // automatically by the destructor
        }
    }
    virtual void ThreadFunc() {
        Connections.push_back(new Connection(this));
        acc.async_accept(
            Connections.back()->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error));
        service.run();
    }
public:
    Server():
        service(),
        acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(NULL)
    {  }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Server::ThreadFunc, this));
    }
    void Stop() {
        service.stop();
    }
    void Join() {
        if (thread) thread->join();
    }
    void StopAllConnections() {
        for (auto c : Connections) {
            c->Stop();
        }
    }
    void JoinAllConnections() {
        for (auto c : Connections) {
            c->Join();
        }
    }
    void KillAllConnections() {
        for (auto c : Connections) {
            delete c;
        }
        Connections.clear();
    }
    void KillConnection(Connection *c) {
        Connections.remove(c);
        delete c;
    }
    virtual ~Server() {
        delete thread;
        // connection should be deleted by the user (?)
    }
};

void Connection::KillMe() {
    server->KillConnection(this);
}

int main() {
    try {
        Server s;
        s.Start();
        std::cin.get(); // wait for enter
        s.Stop();   // stop listening first
        s.StopAllConnections(); // interrupt ongoing connections
        s.Join();   // wait for server, should return immediately
        s.JoinAllConnections(); // wait for ongoing connections
        s.KillAllConnections(); // destroy connection objects
        // at the end of scope, Server will be destroyed
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }
    return 0;
}

1 个解决方案

#1


3  

No. Using an io_service object per connection is definitely a smell. Especially since you're also running each connection on a dedicated thread.

没有。每个连接使用一个io_service对象绝对是一种气味。特别是因为您还在专用线程上运行每个连接。

At this point you have to ask yourself what did asynchrony buy you? You can have all the code synchronous and have exactly the same number of threads etc.

此时你必须问自己,异步会给你带来什么?您可以使所有代码同步并具有完全相同数量的线程等。

Clearly you want to multiplex the connections onto a far smaller number of services. In practice there are a few sensible models like

显然,您希望将连接多路复用到少得多的服务上。在实践中,有一些明智的模型,如

  1. a single io_service with a single service thread (this is usually good). No tasks queued on the service may ever block for significant time or the latency will suffer

    单个服务线程的io_service(这通常很好)。没有在服务上排队的任务可能会阻塞很长时间或延迟将受到影响

  2. a single io_service with a number of threads executing handlers. The number of threads in the pool should be enough to service the max. number of simultaneous CPU intensive tasks supported (or again, the latency will start to go up)

    单个io_service,其中有许多线程执行处理程序。池中的线程数应足以满足最大值。支持的同时CPU密集型任务的数量(或者再次,延迟将开始上升)

  3. an io_service per thread, usually one thread per logical core and with thread affinity so that it "sticks" to that core. This can be ideal for cache locality

    每个线程一个io_service,通常每个逻辑核心一个线程,并具有线程亲和性,以便它“粘住”到该核心。这可能是缓存局部性的理想选择

UPDATE: Demo

Here's a demo that shows the idiomatic style using option 1. from above:

这是一个演示,使用上面的选项1显示惯用风格:

Live On Coliru

住在科利鲁

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket   socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
    typedef boost::shared_ptr<Connection> Ptr;
protected:
    socket_type    sock;
    ba::streambuf  stream_buffer; // for reading etc
    std::string    message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        ba::async_read_until(
            sock,
            stream_buffer,
            '\0',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        message = s;

        ba::async_write(
            sock,
            ba::buffer(message.c_str(), message.size()+1),
            b::bind(&Connection::WriteHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }
    void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) 
    {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }
public:
    Connection(ba::io_service& svc) : sock(svc) { }

    virtual ~Connection() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

    socket_type& Socket() { return sock;          } 
    void Session()        { AsyncReadString();    } 
    void Stop()           { sock.cancel();        }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<boost::weak_ptr<Connection> > m_connections;
protected:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _work;
    acceptor_type _acc;
    b::thread thread;

    void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
        if (!ec) {
            accepted->Session();
            DoAccept();
        }
        else {
            // do nothing the new session will be deleted automatically by the
            // destructor
        }
    }

    void DoAccept() {
        auto newaccept = boost::make_shared<Connection>(_service);

        _acc.async_accept(
            newaccept->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error,
                newaccept
            ));
    }

public:
    Server():
        _service(),
        _work(ba::io_service::work(_service)),
        _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(b::bind(&ba::io_service::run, &_service))
    {  }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        Stop();
        _work.reset();
        if (thread.joinable()) thread.join();
    }

    void Start() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        DoAccept();
    }

    void Stop() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        _acc.cancel();
    }

    void StopAllConnections() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        for (auto c : m_connections) {
            if (auto p = c.lock())
                p->Stop();
        }
    }
};

int main() {
    try {
        Server s;
        s.Start();

        std::cerr << "Shutdown in 2 seconds...\n";
        b::this_thread::sleep_for(b::chrono::seconds(2));

        std::cerr << "Stop accepting...\n";
        s.Stop();

        std::cerr << "Shutdown...\n";
        s.StopAllConnections(); // interrupt ongoing connections
    } // destructor of Server will join the service thread
    catch (std::exception &e) {
        std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }

    std::cerr << "Byebye\n";
}

I modified the main() to run for 2 seconds without user intervention. This is so I can demo it Live On Coliru (of course, it's limited w.r.t the number of client processes).

我修改了main()以运行2秒而无需用户干预。这样我就可以演示Live On Coliru(当然,它对客户端进程的数量有限)。

If you run it with a lot (a lot) of clients, using e.g.

如果您与很多(很多)客户一起运行它,使用例如

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)

You will find that the two second window handles them all:

你会发现两个第二个窗口都处理它们:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 28214
      2 hello world 4554
      2 hello world 6216
      2 hello world 7864
      2 hello world 9966
      2 void Server::Stop()
   1000 std::string Connection::ExtractString()
   1001 virtual Connection::~Connection()
   2000 void Connection::AsyncReadString()
   2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

If you really go berserk and raise 1000 to e.g. 100000 there, you'll get things similar to:

如果你真的发狂并将1000增加到例如那里有100000,你会得到类似的东西:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 5483
      2 hello world 579
      2 hello world 5865
      2 hello world 938
      2 void Server::Stop()
      3 hello world 9613
   1741 std::string Connection::ExtractString()
   1742 virtual Connection::~Connection()
   3482 void Connection::AsyncReadString()
   3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

On repeated 2-second runs of the server.

重复运行2秒钟的服务器。

#1


3  

No. Using an io_service object per connection is definitely a smell. Especially since you're also running each connection on a dedicated thread.

没有。每个连接使用一个io_service对象绝对是一种气味。特别是因为您还在专用线程上运行每个连接。

At this point you have to ask yourself what did asynchrony buy you? You can have all the code synchronous and have exactly the same number of threads etc.

此时你必须问自己,异步会给你带来什么?您可以使所有代码同步并具有完全相同数量的线程等。

Clearly you want to multiplex the connections onto a far smaller number of services. In practice there are a few sensible models like

显然,您希望将连接多路复用到少得多的服务上。在实践中,有一些明智的模型,如

  1. a single io_service with a single service thread (this is usually good). No tasks queued on the service may ever block for significant time or the latency will suffer

    单个服务线程的io_service(这通常很好)。没有在服务上排队的任务可能会阻塞很长时间或延迟将受到影响

  2. a single io_service with a number of threads executing handlers. The number of threads in the pool should be enough to service the max. number of simultaneous CPU intensive tasks supported (or again, the latency will start to go up)

    单个io_service,其中有许多线程执行处理程序。池中的线程数应足以满足最大值。支持的同时CPU密集型任务的数量(或者再次,延迟将开始上升)

  3. an io_service per thread, usually one thread per logical core and with thread affinity so that it "sticks" to that core. This can be ideal for cache locality

    每个线程一个io_service,通常每个逻辑核心一个线程,并具有线程亲和性,以便它“粘住”到该核心。这可能是缓存局部性的理想选择

UPDATE: Demo

Here's a demo that shows the idiomatic style using option 1. from above:

这是一个演示,使用上面的选项1显示惯用风格:

Live On Coliru

住在科利鲁

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket   socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
    typedef boost::shared_ptr<Connection> Ptr;
protected:
    socket_type    sock;
    ba::streambuf  stream_buffer; // for reading etc
    std::string    message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        ba::async_read_until(
            sock,
            stream_buffer,
            '\0',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        message = s;

        ba::async_write(
            sock,
            ba::buffer(message.c_str(), message.size()+1),
            b::bind(&Connection::WriteHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }
    void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) 
    {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }
public:
    Connection(ba::io_service& svc) : sock(svc) { }

    virtual ~Connection() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

    socket_type& Socket() { return sock;          } 
    void Session()        { AsyncReadString();    } 
    void Stop()           { sock.cancel();        }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<boost::weak_ptr<Connection> > m_connections;
protected:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _work;
    acceptor_type _acc;
    b::thread thread;

    void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
        if (!ec) {
            accepted->Session();
            DoAccept();
        }
        else {
            // do nothing the new session will be deleted automatically by the
            // destructor
        }
    }

    void DoAccept() {
        auto newaccept = boost::make_shared<Connection>(_service);

        _acc.async_accept(
            newaccept->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error,
                newaccept
            ));
    }

public:
    Server():
        _service(),
        _work(ba::io_service::work(_service)),
        _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(b::bind(&ba::io_service::run, &_service))
    {  }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        Stop();
        _work.reset();
        if (thread.joinable()) thread.join();
    }

    void Start() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        DoAccept();
    }

    void Stop() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        _acc.cancel();
    }

    void StopAllConnections() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        for (auto c : m_connections) {
            if (auto p = c.lock())
                p->Stop();
        }
    }
};

int main() {
    try {
        Server s;
        s.Start();

        std::cerr << "Shutdown in 2 seconds...\n";
        b::this_thread::sleep_for(b::chrono::seconds(2));

        std::cerr << "Stop accepting...\n";
        s.Stop();

        std::cerr << "Shutdown...\n";
        s.StopAllConnections(); // interrupt ongoing connections
    } // destructor of Server will join the service thread
    catch (std::exception &e) {
        std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }

    std::cerr << "Byebye\n";
}

I modified the main() to run for 2 seconds without user intervention. This is so I can demo it Live On Coliru (of course, it's limited w.r.t the number of client processes).

我修改了main()以运行2秒而无需用户干预。这样我就可以演示Live On Coliru(当然,它对客户端进程的数量有限)。

If you run it with a lot (a lot) of clients, using e.g.

如果您与很多(很多)客户一起运行它,使用例如

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)

You will find that the two second window handles them all:

你会发现两个第二个窗口都处理它们:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 28214
      2 hello world 4554
      2 hello world 6216
      2 hello world 7864
      2 hello world 9966
      2 void Server::Stop()
   1000 std::string Connection::ExtractString()
   1001 virtual Connection::~Connection()
   2000 void Connection::AsyncReadString()
   2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

If you really go berserk and raise 1000 to e.g. 100000 there, you'll get things similar to:

如果你真的发狂并将1000增加到例如那里有100000,你会得到类似的东西:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 5483
      2 hello world 579
      2 hello world 5865
      2 hello world 938
      2 void Server::Stop()
      3 hello world 9613
   1741 std::string Connection::ExtractString()
   1742 virtual Connection::~Connection()
   3482 void Connection::AsyncReadString()
   3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

On repeated 2-second runs of the server.

重复运行2秒钟的服务器。