同時に 100 万人のユーザーがオンラインになっている分析システムを構築しています。メッセージブローカーなどのRabbitMQを使ってサーバーの容量を削減しています
ここに私の図があります
私のシステムには 3 つのコンポーネントが含まれています。
パブリッシャー サーバー : (プロデューサー)
このシステムは nodejs 上に構築されました。メッセージを発行するこのシステムの目的queue
RabbitMQ キュー: このシステムは、送信されたメッセージを格納しますpublisher server
。その後、 のキューからメッセージを送信するために 1 つの接続が開かれますsubscriber server
。
サブスクライバー サーバー (コンシューマー) : このシステムはメッセージを受信します。queue
パブリッシャー サーバーのソース コード
var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
if (error) {
return callback(-1, null);
} else {
connect.createChannel(function(error, channel) {
if (error) {
return callback(-3, null);
} else {
var q = 'logs';
var msg = data; // object
// convert msg object to buffer
var new_msg = Buffer.from(JSON.stringify(msg), 'binary');
channel.assertExchange(q, 'fanout', { durable: false });
channel.publish(q, 'message_queues', new Buffer(new_msg));
console.log(" [x] Sent %s", new_msg);
return callback(null, msg);
}
});
}
});
すべてのコンシューマにブロードキャストを送信する排他的交換
"message_queues"
を作成します"fanout"
サブスクライバー サーバーのソース コード
var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
if (error) {
console.log('111');
} else {
connect.createChannel(function(error, channel) {
if (error) {
console.log('1');
} else {
var ex = 'logs';
channel.assertExchange(ex, 'fanout', { durable: false });
channel.assertQueue('message_queues', { exclusive: true }, function(err, q) {
if (err) {
console.log('123');
} else {
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
channel.bindQueue(q.queue, ex, 'message_queues');
channel.consume(q.queue, function(msg) {
console.log(" [x] %s", msg.content.toString());
}, { noAck: true });
}
});
}
});
}
});
"message_queues"
取引所からメッセージを受け取る
メッセージを送信するときに実装します。システムは正常に動作しますが、このシステムのベンチマーク テスト パフォーマンスを試してみたところ (1 秒あたり最大 1000 ユーザーがリクエストを送信)、システムに問題がありました。システムが過負荷/バッファ オーバーフロー (または何かがうまく機能しない) のように見えます。
2日前にrabbitmqについて読んだだけです。私はそのチュートリアルが基本的な例であることを知っているので、現実の世界でシステムを構築するための助けが必要です.. 解決策と提案
私の質問が理にかなっていることを願っています