ZMQ中请求-应答模式的可靠性设计

时间:2023-02-16 11:01:21

一、什么是可靠性?

  要给可靠性下定义,我们可以先界定它的相反面——故障。如果我们可以处理某些类型的故障,那么我们的模型对于这些故障就是可靠的。下面我们就来列举分布式ZMQ应用程序中可能发生的问题,从可能性高的故障开始:

    1、应用程序代码是最大的故障来源。程序会崩溃或中止,停止对数据来源的响应,或是响应得慢,耗尽内存等。

    2、系统代码,如使用ZMQ编写的中间件,也会意外中止。系统代码应该要比应用程序代码更为可靠,但毕竟也有可能崩溃。特别是当系统代码与速度过慢的客户端交互时,很容易耗尽内存。

    3、消息队列溢出,典型的情况是系统代码中没有对蛮客户端做积极的处理,任由消息队列溢出。

    4、网络临时中断,造成消息丢失。这类错误ZMQ应用程序是无法及时发现的,因为ZMQ会自动进行重连。

    5、硬件系统崩溃,导致所有进程中止。

    6、网络会出现特殊情形的中断,如交换机的某个端口发生故障,导致部分网络无法访问。

    7、数据中心可能遭受雷击、地震、火灾、电压过载、冷却系统失效等。

  想要让软件系统规避上述所有的风险,需要大量的人力物力,故不在本指南的讨论范围之内。

  由于前五个故障类型涵盖了99.9%的情形(这一数据源自我近期进行的一项研究),所以我们会深入探讨。如果你的公司大到足以考虑最后两种情形,那请及时联系我,因为我正愁没钱将我家后院的大坑建成游泳池。

二、可靠性设计

  简单地来说,可靠性就是当程序发生故障时也能顺利地运行下去,这要比搭建一个消息系统来得困难得多。我们会根据ZMQ提供的每一种核心消息模式,来看看如何保障代码的持续运行。

    > 段落引用1、请求-应答模式:当服务端在处理请求是中断,客户端能够得知这一信息,并停止接收消息,转而选择等待重试、请求另一服务端等操作。这里我们暂不讨论客户端发生问题的情形。

    2、发布-订阅模式:如果客户端收到一些消息后意外中止,服务端是不知道这一情况的。发布-订阅模式中的订阅者不会返回任何消息给发布者。但是,订阅者可以通过其他方式联系服务端,如请求-应答模式,要求服务端重发消息。这里我们暂不讨论服务端发生问题的情形。此外,订阅者可以通过某些方式检查自身是否运行得过慢,并采取相应措施(向操作者发出警告、中止等)。

    3、管道模式:如果worker意外终止,任务分发器将无从得知。管道模式和发布-订阅模式类似,只朝一个方向发送消息。但是,下游的结果收集器可以检测哪项任务没有完成,并告诉任务分发器重新分配该任务。如果任务分发器或结果收集器意外中止了,那客户端发出的请求只能另作处理。所以说,系统代码真的要减少出错的几率,因为这很难处理。

  本章主要讲解请求-应答模式中的可靠性设计,其他模式将在后续章节中讲解。

  最基本的请求应答模式是REQ客户端发送一个同步的请求至REP服务端,这种模式的可靠性很低。如果服务端在处理请求时中止,那客户端会永远处于等待状态。

  相比TCP协议,ZMQ提供了自动重连机制、消息分发的负载均衡等。但是,在真实环境中这也是不够的。唯一可以完全信任基本请求-应答模式的应用场景是同一进程的两个线程之间进行通信,没有网络问题或服务器失效的情况。

  但是,只要稍加修饰,这种基本的请求-应答模式就能很好地在现实环境中工作了。我喜欢将其称为“海盗”模式。

  粗略地讲,客户端连接服务端有三种方式,每种方式都需要不同的可靠性设计:

    1、多个客户端直接和单个服务端进行通信。使用场景:只有一个单点服务器,所有客户端都需要和它通信。需处理的故障:服务器崩溃和重启;网络连接中断。

    2、多个客户端和单个队列装置通信,该装置将请求分发给多个服务端。使用场景:任务分发。需处理的故障:worker崩溃和重启,死循环,过载;队列装置崩溃和重启;网络中断。

    3、多个客户端直接和多个服务端通信,无中间件。使用场景:类似域名解析的分布式服务。需处理的故障:服务端崩溃和重启,死循环,过载;网络连接中断。

  以上每种设计都必须有所取舍,很多时候会混合使用。下面我们详细说明。

2.1、懒惰海盗模式

  我们可以通过在客户端进行简单的设置,来实现可靠的请求-应答模式。我暂且称之为“懒惰的海盗”(Lazy Pirate)模式。

  在接收应答时,我们不进行同步等待,而是做以下操作:

    1、对REQ套接字进行轮询,当消息抵达时才进行接收。

    2、请求超时后重发消息,循环多次。

    3、若仍无消息,则结束当前事务。

ZMQ中请求-应答模式的可靠性设计

  使用REQ套接字时必须严格遵守发送-接收过程,因为它内部采用了一个有限状态机来限定状态,这一特性会让我们应用“海盗”模式时遇上一些麻烦。最简单的做法是将REQ套接字关闭重启,从而打破这一限定。

   lpclient: Lazy Pirate client in C

//
//  Lazy Pirate client
//  使用zmq_poll轮询来实现安全的请求-应答
//  运行时可随机关闭或重启lpserver程序
//
#include "czmq.h"
 
#define REQUEST_TIMEOUT     2500    //  毫秒, (> 1000!)
#define REQUEST_RETRIES     3       //  尝试次数
#define SERVER_ENDPOINT     "tcp://localhost:5555"
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    printf ("I: 正在连接服务器...\n");
    void *client = zsocket_new (ctx, ZMQ_REQ);
    assert (client);
    zsocket_connect (client, SERVER_ENDPOINT);
 
    int sequence = 0;
    int retries_left = REQUEST_RETRIES;
    while (retries_left && !zctx_interrupted) {
        //  发送一个请求,并开始接收消息
        char request [10];
        sprintf (request, "%d", ++sequence);
        zstr_send (client, request);
 
        int expect_reply = 1;
        while (expect_reply) {
            //  对套接字进行轮询,并设置超时时间
            zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断
 
            //  如果接收到回复则进行处理
            if (items [0].revents & ZMQ_POLLIN) {
                //  收到服务器应答,必须和请求时的序号一致
                char *reply = zstr_recv (client);
                if (!reply)
                    break;      //  Interrupted
                if (atoi (reply) == sequence) {
                    printf ("I: 服务器返回正常 (%s)\n", reply);
                    retries_left = REQUEST_RETRIES;
                    expect_reply = 0;
                }
                else
                    printf ("E: 服务器返回异常: %s\n",
                        reply);
 
                free (reply);
            }
            else
            if (--retries_left == 0) {
                printf ("E: 服务器不可用,取消操作\n");
                break;
            }
            else {
                printf ("W: 服务器没有响应,正在重试...\n");
                //  关闭旧套接字,并建立新套接字
                zsocket_destroy (ctx, client);
                printf ("I: 服务器重连中...\n");
                client = zsocket_new (ctx, ZMQ_REQ);
                zsocket_connect (client, SERVER_ENDPOINT);
                //  使用新套接字再次发送请求
                zstr_send (client, request);
            }
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

  lpserver: Lazy Pirate server in C

//
//  Lazy Pirate server
//  将REQ套接字连接至 tcp://*:5555
//  和hwserver程序类似,除了以下两点:
//   - 直接输出请求内容
//   - 随机地降慢运行速度,或中止程序,模拟崩溃
//
#include "zhelpers.h"
 
int main (void)
{
    srandom ((unsigned) time (NULL));
 
    void *context = zmq_init (1);
    void *server = zmq_socket (context, ZMQ_REP);
    zmq_bind (server, "tcp://*:5555");
 
    int cycles = 0;
    while (1) {
        char *request = s_recv (server);
        cycles++;
 
        //  循环几次后开始模拟各种故障
        if (cycles > 3 && randof (3) == 0) {
            printf ("I: 模拟程序崩溃\n");
            break;
        }
        else
        if (cycles > 3 && randof (3) == 0) {
            printf ("I: 模拟CPU过载\n");
            sleep (2);
        }
        printf ("I: 正常请求 (%s)\n", request);
        sleep (1);              //  耗时的处理过程
        s_send (server, request);
        free (request);
    }
    zmq_close (server);
    zmq_term (context);
    return 0;
}

  运行这个测试用例时,可以打开两个控制台,服务端会随机发生故障,你可以看看客户端的反应。服务端的典型输出如下:

I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

  客户端的输出是:

I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

  客户端为每次请求都加上了序列号,并检查收到的应答是否和序列号一致,以保证没有请求或应答丢失,同一个应答收到多次或乱序。多运行几次实例,看看是否真的能够解决问题。现实环境中你不需要使用到序列号,那只是为了证明这一方式是可行的。

  客户端使用REQ套接字进行请求,并在发生问题时打开一个新的套接字来,绕过REQ强制的发送/接收过程。可能你会想用DEALER套接字,但这并不是一个好主意。首先,DEALER并不会像REQ那样处理信封(如果你不知道信封是什么,那更不能用DEALER了)。其次,你可能会获得你并不想得到的结果。

  这一方案的优劣是:

    1、优点:简单明了,容易实施;

    2、优点:可以方便地应用到现有的客户端和服务端程序中;

    3、优点:ZMQ有自动重连机制;

    4、缺点:单点服务发生故障时不能定位到新的可用服务。

2.2、简单海盗模式

  在第二种模式中,我们使用一个队列装置来扩展上述的“懒惰的海盗”模式,使客户端能够透明地和多个服务端通信。这里的服务端可以定义为worker。我们可以从最基础的模型开始,分阶段实施这个方案。

  在所有的海盗模式中,worker是无状态的,或者说存在着一个我们所不知道的公共状态,如共享数据库。队列装置的存在意味着worker可以在client毫不知情的情况下随意进出。一个worker死亡后,会有另一个worker接替它的工作。这种拓扑结果非常简洁,但唯一的缺点是队列装置本身会难以维护,可能造成单点故障。

  在第三章中,队列装置的基本算法是最近最少使用算法。那么,如果worker死亡或阻塞,我们需要做些什么?答案是很少很少。我们已经在client中加入了重试的机制,所以,使用基本的LRU队列就可以运作得很好了。这种做法也符合ZMQ的逻辑,所以我们可以通过在点对点交互中插入一个简单的队列装置来扩展它:

ZMQ中请求-应答模式的可靠性设计

  我们可以直接使用“懒惰的海盗”模式中的client,以下是队列装置的代码:

  spqueue: Simple Pirate queue in C

//
//  简单海盗队列
//  
//  这个装置和LRU队列完全一致,不存在任何可靠性机制,依靠client的重试来保证装置的运行
//
#include "czmq.h"
 
#define LRU_READY   "\001"      //  消息:worker准备就绪
 
int main (void)
{
    //  准备上下文和套接字
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");    //  client端点
    zsocket_bind (backend,  "tcp://*:5556");    //  worker端点
 
    //  存放可用worker的队列
    zlist_t *workers = zlist_new ();
 
    while (1) {
        zmq_pollitem_t items [] = {
            { backend,  0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        //  当有可用的woker时,轮询前端端点
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
        if (rc == -1)
            break;              //  中断
 
        //  处理后端端点的worker消息
        if (items [0].revents & ZMQ_POLLIN) {
            //  使用worker的地址进行LRU排队
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break;          //  中断
            zframe_t *address = zmsg_unwrap (msg);
            zlist_append (workers, address);
 
            //  如果消息不是READY,则转发给client
            zframe_t *frame = zmsg_first (msg);
            if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
                zmsg_destroy (&msg);
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  获取client请求,转发给第一个可用的worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (msg) {
                zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
                zmsg_send (&msg, backend);
            }
        }
    }
    //  程序运行结束,进行清理
    while (zlist_size (workers)) {
        zframe_t *frame = (zframe_t *) zlist_pop (workers);
        zframe_destroy (&frame);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}

  以下是worker的代码,用到了“懒惰的海盗”服务,并将其调整为LRU模式(使用REQ套接字传递“已就绪”信号):

  spworker: Simple Pirate worker in C

//
//  简单海盗模式worker
//  
//  使用REQ套接字连接tcp://*:5556,使用LRU算法实现worker
//
#include "czmq.h"
#define LRU_READY   "\001"      //  消息:worker已就绪
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_REQ);
 
    //  使用随机符号来指定套接字标识,方便追踪
    srandom ((unsigned) time (NULL));
    char identity [10];
    sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
    zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
    zsocket_connect (worker, "tcp://localhost:5556");
 
    //  告诉代理worker已就绪
    printf ("I: (%s) worker准备就绪\n", identity);
    zframe_t *frame = zframe_new (LRU_READY, 1);
    zframe_send (&frame, worker, 0);
 
    int cycles = 0;
    while (1) {
        zmsg_t *msg = zmsg_recv (worker);
        if (!msg)
            break;              //  中断
 
        //  经过几轮循环后,模拟各种问题
        cycles++;
        if (cycles > 3 && randof (5) == 0) {
            printf ("I: (%s) 模拟崩溃\n", identity);
            zmsg_destroy (&msg);
            break;
        }
        else
        if (cycles > 3 && randof (5) == 0) {
            printf ("I: (%s) 模拟CPU过载\n", identity);
            sleep (3);
            if (zctx_interrupted)
                break;
        }
        printf ("I: (%s) 正常应答\n", identity);
        sleep (1);              //  进行某些处理
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return 0;
}

  运行上述事例,启动多个worker,一个client,以及一个队列装置,顺序随意。你可以看到worker最终都会崩溃或死亡,client则多次重试并最终放弃。装置从来不会停止,你可以任意重启worker和client,这个模型可以和任意个worker、client交互。

2.3、偏执海盗模式

  “简单海盗队列”模式工作得非常好,主要是因为它只是两个现有模式的结合体。不过,它也有一些缺点:

    1、该模式无法处理队列的崩溃或重启。client会进行重试,但worker不会重启。虽然ZMQ会自动重连worker的套接字,但对于新启动的队列装置来说,由于worker并没有发送“已就绪”的消息,所以它相当于是不存在的。为了解决这一问题,我们需要从队列发送心跳给worker,这样worker就能知道队列是否已经死亡。

    2、队列没有检测worker是否已经死亡,所以当worker在处于空闲状态时死亡,队列装置只有在发送了某个请求之后才会将该worker从队列中移除。这时,client什么都不能做,只能等待。这不是一个致命的问题,但是依然是不够好的。所以,我们需要从worker发送心跳给队列装置,从而让队列得知worker什么时候消亡。

  我们使用一个名为“偏执的海盗模式”来解决上述两个问题。

  之前我们使用REQ套接字作为worker的套接字类型,但在偏执海盗模式中我们会改用DEALER套接字,从而使我们能够任意地发送和接受消息,而不是像REQ套接字那样必须完成发送-接受循环。而DEALER的缺点是我们必须自己管理消息信封。

ZMQ中请求-应答模式的可靠性设计

  我们仍会使用懒惰海盗模式的client,以下是偏执海盗的队列装置代码:

  ppqueue: Paranoid Pirate queue in C

//
//  偏执海盗队列
//
#include "czmq.h"
 
#define HEARTBEAT_LIVENESS  3       //  心跳健康度,3-5是合理的
#define HEARTBEAT_INTERVAL  1000    //  单位:毫秒
 
//  偏执海盗协议的消息代码
#define PPP_READY       "\001"      //  worker已就绪
#define PPP_HEARTBEAT   "\002"      //  worker心跳
 
 
//  使用以下结构表示worker队列中的一个有效的worker
 
typedef struct {
    zframe_t *address;          //  worker的地址
    char *identity;             //  可打印的套接字标识
    int64_t expiry;             //  过期时间
} worker_t;
 
//  创建新的worker
static worker_t *
s_worker_new (zframe_t *address)
{
    worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
    self->address = address;
    self->identity = zframe_strdup (address);
    self->expiry = zclock_time () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
    return self;
}
 
//  销毁worker结构,包括标识
static void
s_worker_destroy (worker_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        worker_t *self = *self_p;
        zframe_destroy (&self->address);
        free (self->identity);
        free (self);
        *self_p = NULL;
    }
}
 
//  worker已就绪,将其移至列表末尾
static void
s_worker_ready (worker_t *self, zlist_t *workers)
{
    worker_t *worker = (worker_t *) zlist_first (workers);
    while (worker) {
        if (streq (self->identity, worker->identity)) {
            zlist_remove (workers, worker);
            s_worker_destroy (&worker);
            break;
        }
        worker = (worker_t *) zlist_next (workers);
    }
    zlist_append (workers, self);
}
 
//  返回下一个可用的worker地址
static zframe_t *
s_workers_next (zlist_t *workers)
{
    worker_t *worker = zlist_pop (workers);
    assert (worker);
    zframe_t *frame = worker->address;
    worker->address = NULL;
    s_worker_destroy (&worker);
    return frame;
}
 
//  寻找并销毁已过期的worker。
//  由于列表中最旧的worker排在最前,所以当找到第一个未过期的worker时就停止。
static void
s_workers_purge (zlist_t *workers)
{
    worker_t *worker = (worker_t *) zlist_first (workers);
    while (worker) {
        if (zclock_time () < worker->expiry)
            break;              //  worker未过期,停止扫描
 
        zlist_remove (workers, worker);
        s_worker_destroy (&worker);
        worker = (worker_t *) zlist_first (workers);
    }
}

int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend  = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");    //  client端点
    zsocket_bind (backend,  "tcp://*:5556");    //  worker端点
    //  List of available workers
    zlist_t *workers = zlist_new ();
 
    //  规律地发送心跳
    uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
 
    while (1) {
        zmq_pollitem_t items [] = {
            { backend,  0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        //  当存在可用worker时轮询前端端点
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
            HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        //  处理后端worker请求
        if (items [0].revents & ZMQ_POLLIN) {
            //  使用worker地址进行LRU路由
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break;          //  中断
 
            //  worker的任何信号均表示其仍然存活
            zframe_t *address = zmsg_unwrap (msg);
            worker_t *worker = s_worker_new (address);
            s_worker_ready (worker, workers);
 
            //  处理控制消息,或者将应答转发给client
            if (zmsg_size (msg) == 1) {
                zframe_t *frame = zmsg_first (msg);
                if (memcmp (zframe_data (frame), PPP_READY, 1)
                &&  memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {
                    printf ("E: invalid message from worker");
                    zmsg_dump (msg);
                }
                zmsg_destroy (&msg);
            }
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  获取下一个client请求,交给下一个可用的worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (!msg)
                break;          //  中断
            zmsg_push (msg, s_workers_next (workers));
            zmsg_send (&msg, backend);
        }
 
        //  发送心跳给空闲的worker
        if (zclock_time () >= heartbeat_at) {
            worker_t *worker = (worker_t *) zlist_first (workers);
            while (worker) {
                zframe_send (&worker->address, backend,
                             ZFRAME_REUSE + ZFRAME_MORE);
                zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
                zframe_send (&frame, backend, 0);
                worker = (worker_t *) zlist_next (workers);
            }
            heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        }
        s_workers_purge (workers);
    }
 
    //  程序结束后进行清理
    while (zlist_size (workers)) {
        worker_t *worker = (worker_t *) zlist_pop (workers);
        s_worker_destroy (&worker);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}

  该队列装置使用心跳机制扩展了LRU模式,看起来很简单,但要想出这个主意还挺难的。下文会更多地介绍心跳机制。

  以下是偏执海盗的worker代码:

  ppworker: Paranoid Pirate worker in C

//
//  偏执海盗worker
//
#include "czmq.h"
 
#define HEARTBEAT_LIVENESS  3       //  合理值:3-5
#define HEARTBEAT_INTERVAL  1000    //  单位:毫秒
#define INTERVAL_INIT       1000    //  重试间隔
#define INTERVAL_MAX       32000    //  回退算法最大值
 
//  偏执海盗规范的常量定义
#define PPP_READY       "\001"      //  消息:worker已就绪
#define PPP_HEARTBEAT   "\002"      //  消息:worker心跳
 
//  返回一个连接至偏执海盗队列装置的套接字
 
static void *
s_worker_socket (zctx_t *ctx) {
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "tcp://localhost:5556");
 
    //  告知队列worker已准备就绪
    printf ("I: worker已就绪\n");
    zframe_t *frame = zframe_new (PPP_READY, 1);
    zframe_send (&frame, worker, 0);
 
    return worker;
}
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *worker = s_worker_socket (ctx);
 
    //  如果心跳健康度为零,则表示队列装置已死亡
    size_t liveness = HEARTBEAT_LIVENESS;
    size_t interval = INTERVAL_INIT;
 
    //  规律地发送心跳
    uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
 
    srandom ((unsigned) time (NULL));
    int cycles = 0;
    while (1) {
        zmq_pollitem_t items [] = { { worker,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        if (items [0].revents & ZMQ_POLLIN) {
            //  获取消息
            //  - 3段消息,信封+内容,表示一个请求
            //  - 1段消息,表示心跳
            zmsg_t *msg = zmsg_recv (worker);
            if (!msg)
                break;          //  中断
 
            if (zmsg_size (msg) == 3) {
                //  若干词循环后模拟各种问题
                cycles++;
                if (cycles > 3 && randof (5) == 0) {
                    printf ("I: 模拟崩溃\n");
                    zmsg_destroy (&msg);
                    break;
                }
                else
                if (cycles > 3 && randof (5) == 0) {
                    printf ("I: 模拟CPU过载\n");
                    sleep (3);
                    if (zctx_interrupted)
                        break;
                }
                printf ("I: 正常应答\n");
                zmsg_send (&msg, worker);
                liveness = HEARTBEAT_LIVENESS;
                sleep (1);              //  做一些处理工作
                if (zctx_interrupted)
                    break;
            }
            else
            if (zmsg_size (msg) == 1) {
                zframe_t *frame = zmsg_first (msg);
                if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
                    liveness = HEARTBEAT_LIVENESS;
                else {
                    printf ("E: 非法消息\n");
                    zmsg_dump (msg);
                }
                zmsg_destroy (&msg);
            }
            else {
                printf ("E: 非法消息\n");
                zmsg_dump (msg);
            }
            interval = INTERVAL_INIT;
        }
        else
        if (--liveness == 0) {
            printf ("W: 心跳失败,无法连接队列装置\n");
            printf ("W: %zd 毫秒后进行重连...\n", interval);
            zclock_sleep (interval);
 
            if (interval < INTERVAL_MAX)
                interval *= 2;
            zsocket_destroy (ctx, worker);
            worker = s_worker_socket (ctx);
            liveness = HEARTBEAT_LIVENESS;
        }
 
        //  适时发送心跳给队列
        if (zclock_time () > heartbeat_at) {
            heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
            printf ("I: worker心跳\n");
            zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
            zframe_send (&frame, worker, 0);
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

  几点说明:

    1、代码中包含了几处失败模拟,和先前一样。这会让代码极难维护,所以当投入使用时,应当移除这些模拟代码。

    2、偏执海盗模式中队列的心跳有时会不正常,下文会讲述这一点。

    3、worker使用了一种类似于懒惰海盗client的重试机制,但有两点不同:

      1、回退算法设置;

      2、永不言弃。

  尝试运行以下代码,跑通流程:

ppqueue &
for i in 1 2 3 4; do
    ppworker &
    sleep 1
done
lpclient &

  你会看到worker逐个崩溃,client在多次尝试后放弃。你可以停止并重启队列装置,client和worker会相继重连,并正确地发送、处理和接收请求,顺序不会混乱。所以说,整个通信过程只有两种情形:交互成功,或client最终放弃。