2

サーバー側にノードとエクスプレスを備えたangularjsアプリがあります。node-amqp と socket.io もあります

次の動作を実装したい

アプリには、リアルタイム データを含むテーブルを表示するページ (ルート、角度ビュー) があります。データは、socket.io と amqp を使用してリアルタイムで更新され、アプリの外部にある rabbitMQ サーバーからデータをストリーミングします。

ユーザーがブラウザでこのページ/ルートにアクセスしたとき

  1. クライアントがソケット イベント「subscribe」を発行します</li>
  2. サーバー, ソケットイベントで “サブスクライブ”,
    • ウサギのキューを宣言します
    • ウサギのキューを交換にバインドします
    • ウサギのキューからのメッセージ/データを購読します
    • ユーザー/クライアントにデータを送り返すソケットイベント「データ」を発行します

ユーザーがページを離れたとき、つまりルートを変更したとき

  1. クライアントがソケット イベント「unsubscribe」を発行します</li>
  2. サーバー, ソケットイベントで “登録解除”,
    • キューから退会します

私の問題は次のとおりです: queue.subscribe と queue.unsubscribe が同期されていることを確認するにはどうすればよいですか? ユーザーが一連のルート変更をすばやく実行した場合: 訪問/脱退/訪問/脱退/訪問/脱退 加入と脱退の順序が逆になることがあり、サーバーは新しい加入が完了する前に前回の加入を 2 回脱退します。なにか提案を?これは私が試したものですが、機能していません:

クライアント側: controller.js

.controller('WatchdogCtrl', function($scope, watchSocket) {

    var data = {}
    $scope.data = []

    var socket = watchSocket

    socket.emit('subscribe', { exchange: 'bus', key: 'mis.service-state' })
    socket.on('data', function(message) {
        // refreshing  data 
        data[message.payload.id] = message.payload;
        var new-values = [];
        angular.forEach(data, function(value, index) {
            this.push(value);
        }, new-values);

        $scope.data = new-values
        $scope.$apply()
    });

    $scope.$on('$destroy', function (event) {
        // unsubscribe from rabbit queue when leaving 
        socket.emit('unsubscribe')
    });
})

サーバー側: server.js

// set up amqp listener
var amqp = require('amqp');
// create rabbitmq connection with amqp
var rabbitMQ = amqp.createConnection({url: "amqp://my:url"});
rabbitMQ.on('ready', function() {
    console.log('Connection to rabbitMQ is ready')
});

// Hook Socket.io into Express
var io = require('socket.io').listen(server);
io.set('log level', 2);
io.of('/watch').on('connection', function(socket) {
    var watchq;
    var defr;
    socket.on('subscribe', function(spec) {
        watchq = rabbitMQ.queue('watch-queue', function(queue) {
            console.log('declare rabbit queue: "' + queue.name +'"');
            console.log('bind queue '+ queue.name + ' to exch=' + spec.exchange + ', key=' + spec.key);

            queue.bind(spec.exchange, spec.key)
            defr = queue.subscribe(function(message, headers, deliveryInfo) {
                     socket.emit('data', {
                        key: deliveryInfo.routingKey,
                        payload: JSON.parse(message.data.toString('utf8'))
                     })
                   }).addCallback(function(ok) { 
                       var ctag = ok.consumerTag; 
                       console.log('subscribed to queue: ' + queue.name + ' ctag = ' + ctag)
                   });

        })
    })

    socket.on('unsubscribe', function() {
        //needs fix: this does not ensure subscribe/unsubscribe synchronization…..
        defr.addCallback(function(ok) {
            console.log('unsubscribe form queue:', watchq.name, ', ctag =', ok.consumerTag)
            watchq.unsubscribe(ok.consumerTag);
        })
    })

});

サーバー console.log メッセージ: (visit#3 と leave#3 は同期していません)

declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.6418165327049792 //<-- visit#1
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.6418165327049792 //<--leave#1
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.455362161854282 //<-- visit#2
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.455362161854282 //<-- leave#2
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.455362161854282 //<-- leave#3
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.4509762797970325 //<-- visit#3
4

1 に答える 1

3

私たちはあなたのものと非常によく似た設定をしています。未使用の場合は有効期限のある匿名の専用キューを作成します。匿名キューには、ブローカーによって生成された一意の名前が付けられます。排他キューは、クライアントが切断されるとすぐに削除されます (チャネルが破棄されるとすぐに)。キューの有効期限は RabbitMQ の拡張機能ですが、私たちが使用する amqplib でサポートされています。node-amqp も、そのような拡張機能を何らかの形でサポートしていると確信しています。

また、ソケットごとにチャネルを作成します (ただし、同じ接続を再利用します)。これにより、ソケットと匿名キューの間の 1 対 1 のマッピングが提供されます。そのキューへのバインドは、単一のソケットのバインドと同等です。このため、キューの特別な命名規則やルーティングキーのチェックなどを行わなくても、どのソケットがどのメッセージを受け取るべきかを本質的に知っています。

ソケットが閉じられたら、RabbitMQ チャネル (接続ではなく) を閉じます。特別な配信停止イベントは必要ありませんが、後日そのようなイベントを追加する可能性があります。

これは、競合状態がなく複数のタブが開いている場合、同じブラウザーが複数のキューを持つことができることも意味します。

于 2014-02-11T14:36:30.957 に答える