节点。js,插座。io, Redis pub/sub - high volume,低延迟困难

时间:2021-04-19 19:00:13

When conjoining socket.io/node.js and redis pub/sub in an attempt to create a real-time web broadcast system driven by server events that can handle multiple transports, there seems to be three approaches:

当连接socket . io /节点。js和redis pub/sub试图创建一个由服务器事件驱动、可以处理多种传输的实时web广播系统,有三种方法:

  1. 'createClient' a redis connection and subscribe to channel(s). On socket.io client connection, join the client into a socket.io room. In the redis.on("message", ...) event, call io.sockets.in(room).emit("event", data) to distribute to all clients in the relevant room. Like How to reuse redis connection in socket.io?

    “createClient”是一个redis连接并订阅通道。套接字。io客户端连接,将客户端连接到套接字中。io的房间。在复述。在(“消息”,…)事件中,打电话给io.s。发送(“事件”、数据)到相关房间的所有客户端。比如如何在socket.io中重用redis连接?

  2. 'createClient' a redis connection. On socket.io client connection, join the client into a socket.io room and subscribe to relevant redis channel(s). Include redis.on("message", ...) inside the client connection closure and on receipt of message call client.emit("event", data) to raise the event on the specific client. Like the answer in Examples in using RedisStore in socket.io

    createClient复述,连接。套接字。io客户端连接,将客户端连接到套接字中。io房间和订阅相关的redis频道。包括复述。在(“message”,…)客户端连接闭包内和收到消息调用客户端。发出(“事件”,数据)来在特定的客户机上引发事件。类似于在socket.com .io中使用RedisStore的示例中的答案

  3. Use the RedisStore baked into socket.io and 'broadcast' from the single "dispatch" channel in Redis following the socketio-spec protocol.

    使用在插座内的再存储。io和来自Redis的单一“分派”频道的“广播”遵循socketio规范协议。

Number 1 allows handling the Redis sub and associated event once for all clients. Number 2 offers a more direct hook into Redis pub/sub. Number 3 is simpler, but offers little control over the messaging events.

数字1允许为所有客户端处理一次Redis子事件和关联事件。数字2提供了一个与Redis pub/sub更直接的联系。数字3比较简单,但对消息传递事件的控制很少。

However, in my tests, all exhibit unexpectedly low performance with more than 1 connected client. The server events in question are 1,000 messages published to a redis channel as quickly as possible, to be distributed as quickly as possible. Performance is measured by timings at the connected clients (socket.io-client based that log timestamps into a Redis list for analysis).

然而,在我的测试中,在超过一个连接的客户端上,所有的表现都表现出出乎意料的低性能。所涉及的服务器事件是尽可能快地将1,000条消息发布到redis通道,以便尽快分发。性能是通过连接的客户端(套接字)的计时来衡量的。基于io-client的日志时间戳到一个Redis列表中进行分析)。

What I surmise is that in option 1, server receives the message, then sequentially writes it to all connected clients. In option 2, server receives each message multiple times (once per client subscription) and writes it to the relevant client. In either case, the server doesn't get to the second message event until it's communicated to all connected clients. A situation clearly exacerbated with rising concurrency.

我推测,在选项1中,服务器接收消息,然后按顺序将其写入所有连接的客户端。在选项2中,服务器多次接收每个消息(每个客户端订阅一次)并将其写入相关客户端。在任何一种情况下,服务器都不会到达第二个消息事件,直到它与所有连接的客户端通信。并发性的增加明显加剧了这种情况。

This seems at odds with the perceived wisdom of the stacks capabilities. I want to believe, but I'm struggling.

这似乎与栈的感知智慧不一致。我想相信,但我在挣扎。

Is this scenario (low latency distribution of high volume of messages) just not an option with these tools (yet?), or am I missing a trick?

这个场景(低延迟的大量消息分发)不是这些工具的选项(还没有吗?),还是我漏掉了一个技巧?

1 个解决方案

#1


29  

I thought this was a reasonable question and had researched it briefly a while back. I spent a little time searching for examples that you may be able to pick up some helpful tips from.

我认为这是一个合理的问题,并在一段时间内研究过。我花了一点时间搜索示例,以便您能够从其中获得一些有用的技巧。

Examples

I like to begin with straight forward examples:

我喜欢从直接的例子开始:

The light sample is a single page (note you'll want to replace redis-node-client with something like node_redis by Matt Ranney:

light示例是一个单独的页面(注意,您希望用Matt Ranney的node_redis之类的东西替换redis-node-client:

/*
 * Mclarens Bar: Redis based Instant Messaging
 * Nikhil Marathe - 22/04/2010

 * A simple example of an IM client implemented using
 * Redis PUB/SUB commands so that all the communication
 * is offloaded to Redis, and the node.js code only
 * handles command interpretation,presentation and subscribing.
 * 
 * Requires redis-node-client and a recent version of Redis
 *    http://code.google.com/p/redis
 *    http://github.com/fictorial/redis-node-client
 *
 * Start the server then telnet to port 8000
 * Register with NICK <nick>, use WHO to see others
 * Use TALKTO <nick> to initiate a chat. Send a message
 * using MSG <nick> <msg>. Note its important to do a
 * TALKTO so that both sides are listening. Use STOP <nick>
 * to stop talking to someone, and QUIT to exit.
 *
 * This code is in the public domain.
 */
var redis = require('./redis-node-client/lib/redis-client');

var sys = require('sys');
var net = require('net');

var server = net.createServer(function(stream) {
    var sub; // redis connection
    var pub;
    var registered = false;
    var nick = "";

    function channel(a,b) {
    return [a,b].sort().join(':');
    }

    function shareTable(other) {
    sys.debug(nick + ": Subscribing to "+channel(nick,other));
    sub.subscribeTo(channel(nick,other), function(channel, message) {
        var str = message.toString();
        var sender = str.slice(0, str.indexOf(':'));
        if( sender != nick )
        stream.write("[" + sender + "] " + str.substr(str.indexOf(':')+1) + "\n");
    });
    }

    function leaveTable(other) {
    sub.unsubscribeFrom(channel(nick,other), function(err) {
        stream.write("Stopped talking to " + other+ "\n");
    });
    }

    stream.addListener("connect", function() {
    sub = redis.createClient();
    pub = redis.createClient();
    });

    stream.addListener("data", function(data) {
    if( !registered ) {
        var msg = data.toString().match(/^NICK (\w*)/);
        if(msg) {
        stream.write("SERVER: Hi " + msg[1] + "\n");
        pub.sadd('mclarens:inside', msg[1], function(err) {
            if(err) {
            stream.end();
            }
            registered = true;
            nick = msg[1];
// server messages
            sub.subscribeTo( nick + ":info", function(nick, message) {
            var m = message.toString().split(' ');
            var cmd = m[0];
            var who = m[1];
            if( cmd == "start" ) {
                stream.write( who + " is now talking to you\n");
                shareTable(who);
            }
            else if( cmd == "stop" ) {
                stream.write( who + " stopped talking to you\n");
                leaveTable(who);
            }
            });
        });
        }
        else {
        stream.write("Please register with NICK <nickname>\n");
        }
        return;
    }

    var fragments = data.toString().replace('\r\n', '').split(' ');
    switch(fragments[0]) {
    case 'TALKTO':
        pub.publish(fragments[1]+":info", "start " + nick, function(a,b) {
        });
        shareTable(fragments[1]);
        break;
    case 'MSG':
        pub.publish(channel(nick, fragments[1]),
            nick + ':' +fragments.slice(2).join(' '),
              function(err, reply) {
              if(err) {
                  stream.write("ERROR!");
              }
              });
        break;
    case 'WHO':
        pub.smembers('mclarens:inside', function(err, users) {
        stream.write("Online:\n" + users.join('\n') + "\n");
        });
        break;
    case 'STOP':
        leaveTable(fragments[1]);
        pub.publish(fragments[1]+":info", "stop " + nick, function() {});
        break;
    case 'QUIT':
        stream.end();
        break;
    }
    });

    stream.addListener("end", function() {
    pub.publish(nick, nick + " is offline");
    pub.srem('mclarens:inside', nick, function(err) {
        if(err) {
        sys.debug("Could not remove client");
        }
    });
    });
});

server.listen(8000, "localhost");

Documents

There's a ton of documentation out there, and the apis are rapidly changing on this type of stack so you'll have to weigh the time relevance of each doc.

有大量的文档,在这种类型的堆栈上api正在快速变化,所以您必须权衡每个文档的时间相关性。

Related Questions

Just a few related questions, this is a hot topic on stack:

有几个相关的问题,这是一个热门话题:

Notable tips (ymmv)

Turn off or optimize socket pooling, use efficient bindings, monitor latency, and make sure you're not duplicating work (ie no need to publish to all listeners twice).

关闭或优化套接字池,使用高效的绑定,监视延迟,并确保不重复工作(即不需要向所有监听器发布两次)。

#1


29  

I thought this was a reasonable question and had researched it briefly a while back. I spent a little time searching for examples that you may be able to pick up some helpful tips from.

我认为这是一个合理的问题,并在一段时间内研究过。我花了一点时间搜索示例,以便您能够从其中获得一些有用的技巧。

Examples

I like to begin with straight forward examples:

我喜欢从直接的例子开始:

The light sample is a single page (note you'll want to replace redis-node-client with something like node_redis by Matt Ranney:

light示例是一个单独的页面(注意,您希望用Matt Ranney的node_redis之类的东西替换redis-node-client:

/*
 * Mclarens Bar: Redis based Instant Messaging
 * Nikhil Marathe - 22/04/2010

 * A simple example of an IM client implemented using
 * Redis PUB/SUB commands so that all the communication
 * is offloaded to Redis, and the node.js code only
 * handles command interpretation,presentation and subscribing.
 * 
 * Requires redis-node-client and a recent version of Redis
 *    http://code.google.com/p/redis
 *    http://github.com/fictorial/redis-node-client
 *
 * Start the server then telnet to port 8000
 * Register with NICK <nick>, use WHO to see others
 * Use TALKTO <nick> to initiate a chat. Send a message
 * using MSG <nick> <msg>. Note its important to do a
 * TALKTO so that both sides are listening. Use STOP <nick>
 * to stop talking to someone, and QUIT to exit.
 *
 * This code is in the public domain.
 */
var redis = require('./redis-node-client/lib/redis-client');

var sys = require('sys');
var net = require('net');

var server = net.createServer(function(stream) {
    var sub; // redis connection
    var pub;
    var registered = false;
    var nick = "";

    function channel(a,b) {
    return [a,b].sort().join(':');
    }

    function shareTable(other) {
    sys.debug(nick + ": Subscribing to "+channel(nick,other));
    sub.subscribeTo(channel(nick,other), function(channel, message) {
        var str = message.toString();
        var sender = str.slice(0, str.indexOf(':'));
        if( sender != nick )
        stream.write("[" + sender + "] " + str.substr(str.indexOf(':')+1) + "\n");
    });
    }

    function leaveTable(other) {
    sub.unsubscribeFrom(channel(nick,other), function(err) {
        stream.write("Stopped talking to " + other+ "\n");
    });
    }

    stream.addListener("connect", function() {
    sub = redis.createClient();
    pub = redis.createClient();
    });

    stream.addListener("data", function(data) {
    if( !registered ) {
        var msg = data.toString().match(/^NICK (\w*)/);
        if(msg) {
        stream.write("SERVER: Hi " + msg[1] + "\n");
        pub.sadd('mclarens:inside', msg[1], function(err) {
            if(err) {
            stream.end();
            }
            registered = true;
            nick = msg[1];
// server messages
            sub.subscribeTo( nick + ":info", function(nick, message) {
            var m = message.toString().split(' ');
            var cmd = m[0];
            var who = m[1];
            if( cmd == "start" ) {
                stream.write( who + " is now talking to you\n");
                shareTable(who);
            }
            else if( cmd == "stop" ) {
                stream.write( who + " stopped talking to you\n");
                leaveTable(who);
            }
            });
        });
        }
        else {
        stream.write("Please register with NICK <nickname>\n");
        }
        return;
    }

    var fragments = data.toString().replace('\r\n', '').split(' ');
    switch(fragments[0]) {
    case 'TALKTO':
        pub.publish(fragments[1]+":info", "start " + nick, function(a,b) {
        });
        shareTable(fragments[1]);
        break;
    case 'MSG':
        pub.publish(channel(nick, fragments[1]),
            nick + ':' +fragments.slice(2).join(' '),
              function(err, reply) {
              if(err) {
                  stream.write("ERROR!");
              }
              });
        break;
    case 'WHO':
        pub.smembers('mclarens:inside', function(err, users) {
        stream.write("Online:\n" + users.join('\n') + "\n");
        });
        break;
    case 'STOP':
        leaveTable(fragments[1]);
        pub.publish(fragments[1]+":info", "stop " + nick, function() {});
        break;
    case 'QUIT':
        stream.end();
        break;
    }
    });

    stream.addListener("end", function() {
    pub.publish(nick, nick + " is offline");
    pub.srem('mclarens:inside', nick, function(err) {
        if(err) {
        sys.debug("Could not remove client");
        }
    });
    });
});

server.listen(8000, "localhost");

Documents

There's a ton of documentation out there, and the apis are rapidly changing on this type of stack so you'll have to weigh the time relevance of each doc.

有大量的文档,在这种类型的堆栈上api正在快速变化,所以您必须权衡每个文档的时间相关性。

Related Questions

Just a few related questions, this is a hot topic on stack:

有几个相关的问题,这是一个热门话题:

Notable tips (ymmv)

Turn off or optimize socket pooling, use efficient bindings, monitor latency, and make sure you're not duplicating work (ie no need to publish to all listeners twice).

关闭或优化套接字池,使用高效的绑定,监视延迟,并确保不重复工作(即不需要向所有监听器发布两次)。