RabbitMQ - TcpConnection析构引发的一次handshake_timeout

时间:2023-03-09 04:00:59
RabbitMQ - TcpConnection析构引发的一次handshake_timeout

使用RabbitMQ时,连接rabbit-server一直连接失败,代码没有任何错误提示。但是通过rabbitmqctl始终查询不到连接以及创建的queue等信息。

官方的文件demo里面也没有TcpConnection相关例子,只在github上有些简单说明。

然而网上几乎所有人都依然还是在使用Connection,几乎没有使用TcpConnection的例子。最后还是放弃了网络求助,老老实实看源码定位了。

使用tcpdump确认,代码这边的TcpConnection确实是已经向rabbit-server发出了连接请求。

RabbitMQ - TcpConnection析构引发的一次handshake_timeout

开始观察发现三次握手是已经建立了连接的,但是几秒后,rabbit-server主动发送返回了一个RST包。这非常诧异,查看rabbit-server日志看到,产生了一次handshake_timeout错误。

RabbitMQ - TcpConnection析构引发的一次handshake_timeout

现在可以确认,不是鉴权产生的问题,而是在连接时就已经失败了,在完成连接到RST包收到刚好过了10s时间。在官方文档查阅到,rabbit-server的心跳也刚好是10s。

后来还是确定问题点是在代码上,但是代码只有短短几行从github上copy下来的,怎么会出错呢。

最后在日志打印上发现monitor函数执行了两次,这个小小的信息感觉看到了问题的原因,查看TcpConnection源码monitor被调用的地方。

 public:
/**
* Constructor
* @param connection Parent TCP connection object
* @param socket The socket filedescriptor
* @param buffer The buffer that was already built
* @param handler User-supplied handler object
*/
TcpConnected(TcpConnection *connection, int socket, TcpOutBuffer &&buffer, TcpHandler *handler) :
TcpState(connection, handler),
_socket(socket),
_out(std::move(buffer)),
_in()
{
// if there is already an output buffer, we have to send out that first
if (_out) _out.sendto(_socket); // tell the handler to monitor the socket, if there is an out
_handler->monitor(_connection, _socket, _out ? readable | writable : readable);
} /**
* Destructor
*/
virtual ~TcpConnected() noexcept
{
// we no longer have to monitor the socket
_handler->monitor(_connection, _socket, ); // close the socket
close(_socket);
}

在构造和析构中各调用了一次,而且内部使用connection可能是为了提高效率进行了线程操作,也就是说实际的connection是在多线程中完成的。

最后尝试修改代码,使用指针进行操作,因为代码并不是github上的单个函数文件,而是多处引用,最后问题解决。成功使用TcpConnection连接上了rabbit-server。

附上简单代码:

 int Broker::init(std::string host,int port, std::string username, std::string userpasswd, int svrid)
{
// create an instance of your own tcp handler
_handle = new DSBrokerMessageHandle(); // address of the server
AMQP::Address address(host, port,AMQP::Login(username,userpasswd),"/"); // create a AMQP connection object
_connection = new AMQP::TcpConnection(_handle, address);

// and create a channel
_channel = new AMQP::TcpChannel(&connection); auto receiveMessageCallback = [=](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
//_channel->ack(deliveryTag);
}; AMQP::QueueCallback callback =
[=](const std::string &name, int msgcount, int consumercount)
{
_channel->bindQueue("service", name, name);
_channel->bindQueue("service", name, "monitor");
_channel->bindQueue("service", name, "heartbeat"); _channel->consume(name, AMQP::noack).onReceived(receiveMessageCallback);
}; AMQP::SuccessCallback success = [svrid, this, callback]()
{
char que[] = { '\0' };
ACE_OS::itoa(svrid, que, );
std::string quename(que);
_channel->declareQueue(quename, AMQP::durable).onSuccess(callback);
}; // use the channel object to call the AMQP method you like
_channel->declareExchange("service", AMQP::fanout).onSuccess(success); return ;
}