1

同時に 100 万人のユーザーがオンラインになっている分析システムを構築しています。メッセージブローカーなどのRabbitMQを使ってサーバーの容量を削減しています

ここに私の図があります

ここに画像の説明を入力

私のシステムには 3 つのコンポーネントが含まれています。

パブリッシャー サーバー : (プロデューサー) このシステムは nodejs 上に構築されました。メッセージを発行するこのシステムの目的queue

RabbitMQ キュー: このシステムは、送信されたメッセージを格納しますpublisher server。その後、 のキューからメッセージを送信するために 1 つの接続が開かれますsubscriber server

サブスクライバー サーバー (コンシューマー) : このシステムはメッセージを受信します。queue

パブリッシャー サーバーのソース コード

var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
    if (error) {
        return callback(-1, null);
    } else {
       connect.createChannel(function(error, channel) {
       if (error) {
           return callback(-3, null);
       } else {
         var q = 'logs';
         var msg = data; // object
         // convert msg object to buffer 
         var new_msg = Buffer.from(JSON.stringify(msg), 'binary');

        channel.assertExchange(q, 'fanout', { durable: false });
        channel.publish(q, 'message_queues', new Buffer(new_msg));
       console.log(" [x] Sent %s", new_msg);
        return callback(null, msg);
      }
    });
   }
}); 

すべてのコンシューマにブロードキャストを送信する排他的交換"message_queues"を作成します"fanout"

サブスクライバー サーバーのソース コード

var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
    if (error) {
        console.log('111');
    } else {
        connect.createChannel(function(error, channel) {
            if (error) {
                console.log('1');
            } else {
                var ex = 'logs';

                channel.assertExchange(ex, 'fanout', { durable: false });
                channel.assertQueue('message_queues', { exclusive: true }, function(err, q) {
                    if (err) {
                        console.log('123');
                    } else {
                        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
                        channel.bindQueue(q.queue, ex, 'message_queues');

                        channel.consume(q.queue, function(msg) {
                            console.log(" [x] %s", msg.content.toString());

                        }, { noAck: true });

                    }
                });
            }
        });
    }

});

"message_queues"取引所からメッセージを受け取る

メッセージを送信するときに実装します。システムは正常に動作しますが、このシステムのベンチマーク テスト パフォーマンスを試してみたところ (1 秒あたり最大 1000 ユーザーがリクエストを送信)、システムに問題がありました。システムが過負荷/バッファ オーバーフロー (または何かがうまく機能しない) のように見えます。

2日前にrabbitmqについて読んだだけです。私はそのチュートリアルが基本的な例であることを知っているので、現実の世界でシステムを構築するための助けが必要です.. 解決策と提案

私の質問が理にかなっていることを願っています

4

1 に答える 1