68

複数のトランスポートを処理できるサーバー イベントによって駆動されるリアルタイムの Web ブロードキャスト システムを作成する試みで socket.io/node.js と redis pub/sub を結合する場合、次の 3 つのアプローチがあるようです。

  1. 'createClient' redis 接続を作成し、チャネルにサブスクライブします。socket.io クライアント接続で、クライアントを socket.io ルームに参加させます。redis.on("message", ...) イベントで、 io.sockets.in(room).emit("event", data) を呼び出して、関連するルーム内のすべてのクライアントに配信します。socket.ioでredis接続を再利用する方法は?

  2. 'createClient' redis 接続。socket.io クライアント接続で、クライアントを socket.io ルームに参加させ、関連する redis チャネルにサブスクライブします。クライアント接続クロージャー内に redis.on("message", ...) を含め、メッセージの受信時に client.emit("event", data) を呼び出して、特定のクライアントでイベントを発生させます。socket.io で RedisStore を使用する例の回答のように

  3. socket.io に焼き付けられた RedisStore を使用し、socketio-spec プロトコルに従って、Redis の単一の「ディスパッチ」チャネルから「ブロードキャスト」します。

番号 1 では、Redis サブと関連するイベントをすべてのクライアントに対して 1 回処理できます。2 番目は、Redis pub/sub へのより直接的なフックを提供します。番号 3 はより単純ですが、メッセージング イベントをほとんど制御できません。

ただし、私のテストでは、複数のクライアントが接続されていると、すべてが予想外に低いパフォーマンスを示します。問題のサーバー イベントは、1,000 個のメッセージが redis チャネルにできるだけ早く発行され、できるだけ早く配布されることです。パフォーマンスは、接続されたクライアント (分析のためにタイムスタンプを Redis リストに記録する socket.io-client ベース) でのタイミングによって測定されます。

オプション1では、サーバーがメッセージを受信し、接続されているすべてのクライアントに順次書き込みます。オプション 2 では、サーバーは各メッセージを複数回 (クライアント サブスクリプションごとに 1 回) 受信し、関連するクライアントに書き込みます。いずれの場合も、接続されているすべてのクライアントに通知されるまで、サーバーは 2 番目のメッセージ イベントに到達しません。同時実行数の増加に伴い、状況が明らかに悪化しました。

これは、スタック機能の認識された知恵と矛盾しているようです。信じたいけど難しい。

このシナリオ (大量のメッセージの低遅延配信) は、これらのツールのオプションではないのでしょうか (まだ?)、それともトリックを見逃しているのでしょうか?

4

1 に答える 1

30

これは合理的な質問だと思い、しばらく前に簡単に調査しました。少し時間をかけて、役立つヒントを得ることができる例を探しました。

私は簡単な例から始めるのが好きです:

ライト サンプルは単一のページです (redis-node-client を、Matt Ranneyによる node_redis のようなものに置き換えることに注意してください。

/*
 * Mclarens Bar: Redis based Instant Messaging
 * Nikhil Marathe - 22/04/2010

 * A simple example of an IM client implemented using
 * Redis PUB/SUB commands so that all the communication
 * is offloaded to Redis, and the node.js code only
 * handles command interpretation,presentation and subscribing.
 * 
 * Requires redis-node-client and a recent version of Redis
 *    http://code.google.com/p/redis
 *    http://github.com/fictorial/redis-node-client
 *
 * Start the server then telnet to port 8000
 * Register with NICK <nick>, use WHO to see others
 * Use TALKTO <nick> to initiate a chat. Send a message
 * using MSG <nick> <msg>. Note its important to do a
 * TALKTO so that both sides are listening. Use STOP <nick>
 * to stop talking to someone, and QUIT to exit.
 *
 * This code is in the public domain.
 */
var redis = require('./redis-node-client/lib/redis-client');

var sys = require('sys');
var net = require('net');

var server = net.createServer(function(stream) {
    var sub; // redis connection
    var pub;
    var registered = false;
    var nick = "";

    function channel(a,b) {
    return [a,b].sort().join(':');
    }

    function shareTable(other) {
    sys.debug(nick + ": Subscribing to "+channel(nick,other));
    sub.subscribeTo(channel(nick,other), function(channel, message) {
        var str = message.toString();
        var sender = str.slice(0, str.indexOf(':'));
        if( sender != nick )
        stream.write("[" + sender + "] " + str.substr(str.indexOf(':')+1) + "\n");
    });
    }

    function leaveTable(other) {
    sub.unsubscribeFrom(channel(nick,other), function(err) {
        stream.write("Stopped talking to " + other+ "\n");
    });
    }

    stream.addListener("connect", function() {
    sub = redis.createClient();
    pub = redis.createClient();
    });

    stream.addListener("data", function(data) {
    if( !registered ) {
        var msg = data.toString().match(/^NICK (\w*)/);
        if(msg) {
        stream.write("SERVER: Hi " + msg[1] + "\n");
        pub.sadd('mclarens:inside', msg[1], function(err) {
            if(err) {
            stream.end();
            }
            registered = true;
            nick = msg[1];
// server messages
            sub.subscribeTo( nick + ":info", function(nick, message) {
            var m = message.toString().split(' ');
            var cmd = m[0];
            var who = m[1];
            if( cmd == "start" ) {
                stream.write( who + " is now talking to you\n");
                shareTable(who);
            }
            else if( cmd == "stop" ) {
                stream.write( who + " stopped talking to you\n");
                leaveTable(who);
            }
            });
        });
        }
        else {
        stream.write("Please register with NICK <nickname>\n");
        }
        return;
    }

    var fragments = data.toString().replace('\r\n', '').split(' ');
    switch(fragments[0]) {
    case 'TALKTO':
        pub.publish(fragments[1]+":info", "start " + nick, function(a,b) {
        });
        shareTable(fragments[1]);
        break;
    case 'MSG':
        pub.publish(channel(nick, fragments[1]),
            nick + ':' +fragments.slice(2).join(' '),
              function(err, reply) {
              if(err) {
                  stream.write("ERROR!");
              }
              });
        break;
    case 'WHO':
        pub.smembers('mclarens:inside', function(err, users) {
        stream.write("Online:\n" + users.join('\n') + "\n");
        });
        break;
    case 'STOP':
        leaveTable(fragments[1]);
        pub.publish(fragments[1]+":info", "stop " + nick, function() {});
        break;
    case 'QUIT':
        stream.end();
        break;
    }
    });

    stream.addListener("end", function() {
    pub.publish(nick, nick + " is offline");
    pub.srem('mclarens:inside', nick, function(err) {
        if(err) {
        sys.debug("Could not remove client");
        }
    });
    });
});

server.listen(8000, "localhost");

ドキュメント

そこには大量のドキュメントがあり、API はこのタイプのスタックで急速に変化しているため、各ドキュメントの時間関連性を比較検討する必要があります。

関連する質問

関連する質問がいくつかありますが、これはスタックに関するホットなトピックです。

特筆すべきヒント (ymmv)

ソケット プールをオフにするか最適化し、効率的なバインディングを使用し、レイテンシを監視し、作業が重複していないことを確認します (つまり、すべてのリスナーに 2 回発行する必要はありません)。

于 2012-06-13T21:31:51.190 に答える