ZeroMQ是否有数据到达时的通知/回调事件/消息?

时间:2022-06-11 00:52:00

I am trying to integrate ZMQ into an existing windows application that relies heavily on MFC sockets (CASyncSocket).

我试图将ZMQ集成到现有的Windows应用程序中,该应用程序严重依赖于MFC套接字(CASyncSocket)。

I've got a CWinThread derived UI thread (without a GUI) that communicates with a server asynchronously using CAsyncSocket. I would like to add a ZMQ inproc communication line to handle communicating the data received from the server (on a REQ/REP basis) to other threads within the application.

我有一个CWinThread派生的UI线程(没有GUI),它使用CAsyncSocket异步与服务器通信。我想添加一个ZMQ inproc通信线来处理从服务器接收的数据(在REQ / REP基础上)与应用程序内的其他线程的通信。

Using CAsyncSocket, the OnReceive method is called by the MFC framework whenever new data is available on the socket to be received (that might be an over-simplification to the hardcore MFC gurus out there).

使用CAsyncSocket时,每当要接收的套接字上有新数据时,MFC框架就会调用OnReceive方法(这可能是对那里的硬核MFC专家的过度简化)。

Is there any such mechanism in ZMQ? Or do I have to add an additional dedicated WorkerThread that the UI thread launches to handle my ZMQ communications to the rest of the app? The traffic on both pipelines is minimal so I really don't want to have to create 2 separate threads if I can get by with 1.

ZMQ中有没有这样的机制?或者我是否必须添加一个额外的专用WorkerThread,UI线程启动以处理我与应用程序其余部分的ZMQ通信?两个管道上的流量都很小,所以我真的不想创建2个单独的线程,如果我可以使用1。

Note, I've got the basics working, I'm just having problems with synchronization. If I use blocking recv/send with ZMQ, it starves out my CAsycSocket because the windows messages never get processed by the thread resulting in sometimes never getting the data from the server that the ZMQ is supposed to be delivering. But if I use non-blocking ZMQ calls, then the thread frequently ends up sitting idle because it doesn't know to read off the ZMQ socket.

注意,我已经掌握了基础知识,我只是遇到了同步问题。如果我使用阻塞recv / send与ZMQ,它会匮乏我的CAsycSocket,因为Windows消息永远不会被线程处理,导致有时永远不会从ZMQ应该传递的服务器获取数据。但是如果我使用非阻塞的ZMQ调用,那么线程经常最终处于空闲状态,因为它不知道要读取ZMQ套接字。

3 个解决方案

#1


1  

Ultimately, the answer is no. There are no current callback/notifications for when data arrives in ZeroMQ that you can link into. I was also unable to find any fork that adds this functionality.

最终,答案是否定的。当数据到达您可以链接到的ZeroMQ时,没有当前的回调/通知。我也找不到任何添加此功能的fork。

I was unable to get ZMQ working while using the traditional OnReceive calls provide by the MFC socket framework within a single thread and adding a 2nd thread to dedicate to ZMQ defeated the whole purpose for using it (it is being used for thread synchronization).

当使用MFC套接字框架在单个线程内提供的传统OnReceive调用并且添加第二个线程专用于ZMQ时,我无法使ZMQ正常工作,从而打败了使用它的整个目的(它用于线程同步)。

My implementation that works ended up dropping MFC sockets and using ZMQ for both my inproc server (for communicating with other thread) as well as my TCP (non-ZMQ) server connection and using a blocking polling call (zmq_poll()) in the OnIdle() method (returning 1 every time to create a busy loop). The blocking poll

我的实现最终删除了MFC套接字并使用ZMQ用于我的inproc服务器(用于与其他线程通信)以及我的TCP(非ZMQ)服务器连接以及在OnIdle中使用阻塞轮询调用(zmq_poll()) ()方法(每次返回1以创建一个繁忙的循环)。阻止民意调查

BOOL CMyThreaClass::OnIdle(LONG lCount)
{
    UNREFERENCED_PARAMETER(lCount);

    zmq_pollitem_t items [] = {
    { m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
    { m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
    };
    const int iZMQInfiniteTimeout(-1);
    iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
    TRACE("zmq_poll result: %d\n", iResult);

    if (items[0].revents & ZMQ_POLLIN)
    {
        sMyStruct sMessage;
        iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
        TRACE("inproc recv result: %d\n", iResult);
        //  Process inproc messages
        iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
        TRACE("inproc send result: %d\n", iResult);
    }
    if (items[1].revents & ZMQ_POLLIN)
    {
        // there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
        uint8_t id [256];
        size_t id_size = 256;
        iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
        TRACE("getsockopt poll result %d:id %d\n", iResult, id);
        iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
        // now get our actual data
        char szBuffer[1024];
        int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
        if (iBytesReceived > 0)
        {
            // process TCP data
        }
    }
}

Note: This answer requires using ZMQ 4 or later since earlier versions of ZMQ will not communicate with a regular TCP socket connection.

注意:此答案需要使用ZMQ 4或更高版本,因为早期版本的ZMQ将无法与常规TCP套接字连接进行通信。

#2


0  

You could call zmq_recv() with the ZMQ_NOBLOCK flag inside your app's main message loop by overriding your CWinApp::OnIdle(). zmq_recv will return immediately if there is no data waiting on the socket. If there's data, handle it - but be aware that if you do something slow, you'll make the app unresponsive.

您可以通过覆盖CWinApp :: OnIdle()在应用程序的主消息循环中使用ZMQ_NOBLOCK标志调用zmq_recv()。如果套接字上没有数据等待,zmq_recv将立即返回。如果有数据,请处理它 - 但要注意,如果你做得很慢,你会让应用程序无响应。

Edit: I didn't realize that OnIdle only gets called once when the message queue becomes empty. But according to MSDN's documentation, you can return a nonzero value to keep getting called forever:

编辑:我没有意识到只有当消息队列变空时才会调用OnIdle一次。但根据MSDN的文档,您可以返回非零值以永久调用:

  1. If the message loop checks the message queue and finds no pending messages, it calls OnIdle and supplies 0 as the lCount argument.
  2. 如果消息循环检查消息队列并且未找到任何待处理消息,则它将调用OnIdle并提供0作为lCount参数。
  3. OnIdle performs some processing and returns a nonzero value to indicate it should be called again to do further processing.
  4. OnIdle执行一些处理并返回非零值,以指示应再次调用它以进行进一步处理。
  5. The message loop checks the message queue again. If no messages are pending, it calls OnIdle again, incrementing the lCount argument.
  6. 消息循环再次检查消息队列。如果没有待处理的消息,则再次调用OnIdle,递增lCount参数。
  7. Eventually, OnIdle finishes processing all its idle tasks and returns 0. This tells the message loop to stop calling OnIdle until the next message is received from the message queue, at which point the idle cycle restarts with the argument set to 0.
  8. 最终,OnIdle完成处理其所有空闲任务并返回0.这告诉消息循环停止调用OnIdle,直到从消息队列接收到下一条消息,此时空闲循环重新启动,参数设置为0。

I also found this thread on GameDev.net where a user said:

我还在GameDev.net上找到了这个帖子,用户说:

All of the tools I have ever written in MFC that use D3D use the OnIdle() function:

我在MFC中编写的所有使用D3D的工具都使用OnIdle()函数:

BOOL CD3DEditorApp::OnIdle(LONG lCount) 
{
    // Call base class first
    CWinApp:OnIdle( lCount );

    // game stuff...

    // Always return true - this asks the framework to constantly
    // call the Idle function when it isn't busy doing something
    // else.
    return TRUE;
}

So, at least according to one person, it's a common technique.

所以,至少根据一个人的说法,这是一种常见的技巧。

#3


0  

You can signal from thread to thread with custom Windows messages. Here is a custom message:

您可以使用自定义Windows消息从线程发送信号。这是一条自定义消息:

#define WM_MY_MESSAGE (WM_APP + 1)

To send it to a thread that has a window use PostMessage or SendMessage to the HWND. Add it to the window's message map with ON_MESSAGE.

要将它发送到具有窗口的线程,请将PostMessage或SendMessage用于HWND。使用ON_MESSAGE将其添加到窗口的消息映射中。

To send it to a CWinThread-derived thread that has no windows use PostThreadMessage and receive it with ON_THREAD_MESSAGE.

要将它发送到没有窗口的CWinThread派生线程,请使用PostThreadMessage并使用ON_THREAD_MESSAGE接收它。

#1


1  

Ultimately, the answer is no. There are no current callback/notifications for when data arrives in ZeroMQ that you can link into. I was also unable to find any fork that adds this functionality.

最终,答案是否定的。当数据到达您可以链接到的ZeroMQ时,没有当前的回调/通知。我也找不到任何添加此功能的fork。

I was unable to get ZMQ working while using the traditional OnReceive calls provide by the MFC socket framework within a single thread and adding a 2nd thread to dedicate to ZMQ defeated the whole purpose for using it (it is being used for thread synchronization).

当使用MFC套接字框架在单个线程内提供的传统OnReceive调用并且添加第二个线程专用于ZMQ时,我无法使ZMQ正常工作,从而打败了使用它的整个目的(它用于线程同步)。

My implementation that works ended up dropping MFC sockets and using ZMQ for both my inproc server (for communicating with other thread) as well as my TCP (non-ZMQ) server connection and using a blocking polling call (zmq_poll()) in the OnIdle() method (returning 1 every time to create a busy loop). The blocking poll

我的实现最终删除了MFC套接字并使用ZMQ用于我的inproc服务器(用于与其他线程通信)以及我的TCP(非ZMQ)服务器连接以及在OnIdle中使用阻塞轮询调用(zmq_poll()) ()方法(每次返回1以创建一个繁忙的循环)。阻止民意调查

BOOL CMyThreaClass::OnIdle(LONG lCount)
{
    UNREFERENCED_PARAMETER(lCount);

    zmq_pollitem_t items [] = {
    { m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
    { m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
    };
    const int iZMQInfiniteTimeout(-1);
    iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
    TRACE("zmq_poll result: %d\n", iResult);

    if (items[0].revents & ZMQ_POLLIN)
    {
        sMyStruct sMessage;
        iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
        TRACE("inproc recv result: %d\n", iResult);
        //  Process inproc messages
        iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
        TRACE("inproc send result: %d\n", iResult);
    }
    if (items[1].revents & ZMQ_POLLIN)
    {
        // there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
        uint8_t id [256];
        size_t id_size = 256;
        iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
        TRACE("getsockopt poll result %d:id %d\n", iResult, id);
        iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
        // now get our actual data
        char szBuffer[1024];
        int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
        if (iBytesReceived > 0)
        {
            // process TCP data
        }
    }
}

Note: This answer requires using ZMQ 4 or later since earlier versions of ZMQ will not communicate with a regular TCP socket connection.

注意:此答案需要使用ZMQ 4或更高版本,因为早期版本的ZMQ将无法与常规TCP套接字连接进行通信。

#2


0  

You could call zmq_recv() with the ZMQ_NOBLOCK flag inside your app's main message loop by overriding your CWinApp::OnIdle(). zmq_recv will return immediately if there is no data waiting on the socket. If there's data, handle it - but be aware that if you do something slow, you'll make the app unresponsive.

您可以通过覆盖CWinApp :: OnIdle()在应用程序的主消息循环中使用ZMQ_NOBLOCK标志调用zmq_recv()。如果套接字上没有数据等待,zmq_recv将立即返回。如果有数据,请处理它 - 但要注意,如果你做得很慢,你会让应用程序无响应。

Edit: I didn't realize that OnIdle only gets called once when the message queue becomes empty. But according to MSDN's documentation, you can return a nonzero value to keep getting called forever:

编辑:我没有意识到只有当消息队列变空时才会调用OnIdle一次。但根据MSDN的文档,您可以返回非零值以永久调用:

  1. If the message loop checks the message queue and finds no pending messages, it calls OnIdle and supplies 0 as the lCount argument.
  2. 如果消息循环检查消息队列并且未找到任何待处理消息,则它将调用OnIdle并提供0作为lCount参数。
  3. OnIdle performs some processing and returns a nonzero value to indicate it should be called again to do further processing.
  4. OnIdle执行一些处理并返回非零值,以指示应再次调用它以进行进一步处理。
  5. The message loop checks the message queue again. If no messages are pending, it calls OnIdle again, incrementing the lCount argument.
  6. 消息循环再次检查消息队列。如果没有待处理的消息,则再次调用OnIdle,递增lCount参数。
  7. Eventually, OnIdle finishes processing all its idle tasks and returns 0. This tells the message loop to stop calling OnIdle until the next message is received from the message queue, at which point the idle cycle restarts with the argument set to 0.
  8. 最终,OnIdle完成处理其所有空闲任务并返回0.这告诉消息循环停止调用OnIdle,直到从消息队列接收到下一条消息,此时空闲循环重新启动,参数设置为0。

I also found this thread on GameDev.net where a user said:

我还在GameDev.net上找到了这个帖子,用户说:

All of the tools I have ever written in MFC that use D3D use the OnIdle() function:

我在MFC中编写的所有使用D3D的工具都使用OnIdle()函数:

BOOL CD3DEditorApp::OnIdle(LONG lCount) 
{
    // Call base class first
    CWinApp:OnIdle( lCount );

    // game stuff...

    // Always return true - this asks the framework to constantly
    // call the Idle function when it isn't busy doing something
    // else.
    return TRUE;
}

So, at least according to one person, it's a common technique.

所以,至少根据一个人的说法,这是一种常见的技巧。

#3


0  

You can signal from thread to thread with custom Windows messages. Here is a custom message:

您可以使用自定义Windows消息从线程发送信号。这是一条自定义消息:

#define WM_MY_MESSAGE (WM_APP + 1)

To send it to a thread that has a window use PostMessage or SendMessage to the HWND. Add it to the window's message map with ON_MESSAGE.

要将它发送到具有窗口的线程,请将PostMessage或SendMessage用于HWND。使用ON_MESSAGE将其添加到窗口的消息映射中。

To send it to a CWinThread-derived thread that has no windows use PostThreadMessage and receive it with ON_THREAD_MESSAGE.

要将它发送到没有窗口的CWinThread派生线程,请使用PostThreadMessage并使用ON_THREAD_MESSAGE接收它。