私はNodeJSとRabbitMQに取り組んでいます-
- マスター NodeJS サービス (1 つのインスタンス) データをキューにプッシュします。
- スレーブ NodeJS サービス (複数のインスタンス) は、データを消費して処理します。
- デフォルトの交換が使用されています。
スレーブ サービスは PM2 クラスター モードで実行されています。つまり、実行中のスレーブ サービスのインスタンスが 8 つあるということです。
私の予想では、マスター サービスがキューを介してデータをプッシュし始めると、スレーブ サービスはそれらを非同期的に消費し始めるはずです。
例として、マスター サービスがキューを介して 10 個のジョブをプッシュし、各ジョブが完了するまでに 5 秒かかる場合、スレーブはジョブを完了するのに 50 秒かかります。
これは、複数のスレーブを使用する目的を完全に無効にします。通常、スレーブは一度に 8 つのジョブを取得する必要があるためです。
RabbitMQ ダッシュボードによると、上記のセットアップでは -
- 9 接続 (1 マスター + 8 スレーブ)
- 9 チャネル (1 マスター + 8 スレーブ)
- 1 キュー
セットアップ全体でデフォルトの交換が使用されます。
私の質問は -
スレーブがキューからデータを非同期的に読み取ることができないのはなぜですか?
キュー内の次のアイテムに設定noAck
してもtrue
、現在のアイテムが処理されるまでピックアップされません
私の意図は、ユーザーの複数のスレーブ インスタンスによるキューの消費率をスケールアップすることですが、ここで何かが欠けていると思います。
これがコードベースです-
const rabbitMq = require("amqplib");
class RabbitMQClient {
async connect() {
this.connection = await rabbitMq.connect("amqp://admin:password@localhost");
this.channel = await this.connection.createChannel();
await this.channel.assertQueue("TEST_QUEUE");
}
}
// MASTER CODE (This Runs In Fork Mode - 1 Instance)
const master_client = new RabbitMQClient();
master_client.connect().then(() => {
// sending in 10 messages
for (let index = 1; index <= 10; index++) {
const data = Buffer.from(index.toString(), "utf8");
master_client.channel.sendToQueue("TEST_QUEUE", data);
}
});
// SLAVE CODE (This Runs In Cluster Mode - 8 Instances)
const slave_client = new RabbitMQClient();
// connect to rabbitmq
slave_client.connect().then(() => {
// consume the messages
slave_client.channel.consume("TEST_QUEUE", (data) => {
// timeout to add delay
setTimeout(() => {
RabbitMQClient._channel.ack(data);
}, 5000);
});
});
スレーブ出力 -
33|slave | 2020-11-02 13:19:09.293 +00:00: recieved message - 1 (13-19-09)
34|slave | 2020-11-02 13:19:14.293 +00:00: recieved message - 2 (13-19-14)
35|slave | 2020-11-02 13:19:19.299 +00:00: recieved message - 3 (13-19-19)
36|slave | 2020-11-02 13:19:24.299 +00:00: recieved message - 4 (13-19-24)
37|slave | 2020-11-02 13:19:29.300 +00:00: recieved message - 5 (13-19-29)
38|slave | 2020-11-02 13:19:34.299 +00:00: recieved message - 6 (13-19-34)
39|slave | 2020-11-02 13:19:39.301 +00:00: recieved message - 7 (13-19-39)
40|slave | 2020-11-02 13:19:44.301 +00:00: recieved message - 8 (13-19-44)
33|slave | 2020-11-02 13:19:49.299 +00:00: recieved message - 9 (13-19-49)
34|slave | 2020-11-02 13:19:54.300 +00:00: recieved message - 10 (13-19-54)
お気づきのように、さまざまなスレーブがラウンド ロビン方式でメッセージを取得していますが、それらは同期的に動作しています。
ありがとう!