サーバー側にノードとエクスプレスを備えたangularjsアプリがあります。node-amqp と socket.io もあります
次の動作を実装したい
アプリには、リアルタイム データを含むテーブルを表示するページ (ルート、角度ビュー) があります。データは、socket.io と amqp を使用してリアルタイムで更新され、アプリの外部にある rabbitMQ サーバーからデータをストリーミングします。
ユーザーがブラウザでこのページ/ルートにアクセスしたとき
- クライアントがソケット イベント「subscribe」を発行します</li>
- サーバー, ソケットイベントで “サブスクライブ”,
- ウサギのキューを宣言します
- ウサギのキューを交換にバインドします
- ウサギのキューからのメッセージ/データを購読します
- ユーザー/クライアントにデータを送り返すソケットイベント「データ」を発行します
ユーザーがページを離れたとき、つまりルートを変更したとき
- クライアントがソケット イベント「unsubscribe」を発行します</li>
- サーバー, ソケットイベントで “登録解除”,
- キューから退会します
私の問題は次のとおりです: queue.subscribe と queue.unsubscribe が同期されていることを確認するにはどうすればよいですか? ユーザーが一連のルート変更をすばやく実行した場合: 訪問/脱退/訪問/脱退/訪問/脱退 加入と脱退の順序が逆になることがあり、サーバーは新しい加入が完了する前に前回の加入を 2 回脱退します。なにか提案を?これは私が試したものですが、機能していません:
クライアント側: controller.js
.controller('WatchdogCtrl', function($scope, watchSocket) {
var data = {}
$scope.data = []
var socket = watchSocket
socket.emit('subscribe', { exchange: 'bus', key: 'mis.service-state' })
socket.on('data', function(message) {
// refreshing data
data[message.payload.id] = message.payload;
var new-values = [];
angular.forEach(data, function(value, index) {
this.push(value);
}, new-values);
$scope.data = new-values
$scope.$apply()
});
$scope.$on('$destroy', function (event) {
// unsubscribe from rabbit queue when leaving
socket.emit('unsubscribe')
});
})
サーバー側: server.js
// set up amqp listener
var amqp = require('amqp');
// create rabbitmq connection with amqp
var rabbitMQ = amqp.createConnection({url: "amqp://my:url"});
rabbitMQ.on('ready', function() {
console.log('Connection to rabbitMQ is ready')
});
// Hook Socket.io into Express
var io = require('socket.io').listen(server);
io.set('log level', 2);
io.of('/watch').on('connection', function(socket) {
var watchq;
var defr;
socket.on('subscribe', function(spec) {
watchq = rabbitMQ.queue('watch-queue', function(queue) {
console.log('declare rabbit queue: "' + queue.name +'"');
console.log('bind queue '+ queue.name + ' to exch=' + spec.exchange + ', key=' + spec.key);
queue.bind(spec.exchange, spec.key)
defr = queue.subscribe(function(message, headers, deliveryInfo) {
socket.emit('data', {
key: deliveryInfo.routingKey,
payload: JSON.parse(message.data.toString('utf8'))
})
}).addCallback(function(ok) {
var ctag = ok.consumerTag;
console.log('subscribed to queue: ' + queue.name + ' ctag = ' + ctag)
});
})
})
socket.on('unsubscribe', function() {
//needs fix: this does not ensure subscribe/unsubscribe synchronization…..
defr.addCallback(function(ok) {
console.log('unsubscribe form queue:', watchq.name, ', ctag =', ok.consumerTag)
watchq.unsubscribe(ok.consumerTag);
})
})
});
サーバー console.log メッセージ: (visit#3 と leave#3 は同期していません)
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.6418165327049792 //<-- visit#1
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.6418165327049792 //<--leave#1
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.455362161854282 //<-- visit#2
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.455362161854282 //<-- leave#2
unsubscribe form queue: watch-queue , ctag = node-amqp-8359-0.455362161854282 //<-- leave#3
declare rabbit queue: "watch-queue"
bind queue watch-queue to exch=bus, key=mis.service-state
subscribed to queue: watch-queue ctag = node-amqp-8359-0.4509762797970325 //<-- visit#3