187

私はRabbitMQとAMQPを一般的に使い始めたばかりです。

  • メッセージのキューがあります
  • 複数の消費者がいますが、同じメッセージでさまざまなことをしたいと思っています。

RabbitMQのドキュメントのほとんどは、ラウンドロビンに焦点を当てているようです。つまり、単一のメッセージが単一のコンシューマーによって消費され、各コンシューマー間で負荷が分散されます。これは確かに私が目撃した行動です。

例:プロデューサーには単一のキューがあり、2秒ごとにメッセージを送信します。

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

そしてここに消費者がいます:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

コンシューマーを2回起動すると、各コンシューマーがラウンドロビン動作で代替メッセージを消費していることがわかります。たとえば、一方の端末にメッセージ1、3、5、もう一方の端末に2、4、6のメッセージが表示されます

私の質問は:

  • 各コンシューマーに同じメッセージを受信させることはできますか?つまり、両方の消費者がメッセージ1、2、3、4、5、6を受け取りますか?これはAMQP/RabbitMQで何と呼ばれていますか?通常はどのように構成されていますか?

  • これは一般的に行われますか?代わりに、交換でメッセージを2つの別々のキューにルーティングし、単一のコンシューマーを使用する必要がありますか?

4

12 に答える 12

144

各コンシューマーに同じメッセージを受信させることはできますか?つまり、両方の消費者がメッセージ1、2、3、4、5、6を受け取りますか?これはAMQP/RabbitMQで何と呼ばれていますか?通常はどのように構成されていますか?

いいえ、消費者が同じキューにいる場合は違います。RabbitMQのAMQPコンセプトガイドから:

AMQP 0-9-1では、メッセージはコンシューマ間で負荷分散されることを理解することが重要です。

これは、キュー内のラウンドロビン動作が指定されたものであり、構成できないことを意味しているようです。つまり、同じメッセージIDを複数のコンシューマーで処理するには、別々のキューが必要です。

これは一般的に行われますか?代わりに、交換でメッセージを2つの別々のキューにルーティングし、単一のコンシューマーを使用する必要がありますか?

いいえ、そうではありません。各コンシューマーが同じメッセージIDを処理する単一のキュー/複数のコンシューマーは使用できません。交換でメッセージを2つの別々のキューにルーティングすることは、確かに優れています。

あまり複雑なルーティングは必要ないので、ファンアウト交換はこれをうまく処理します。node-amqpには「デフォルトの交換」の概念があり、メッセージを接続に直接公開できますが、ほとんどのAMQPメッセージは特定の交換に公開されるため、以前はExchangeにあまり焦点を当てていませんでした。

これが私のファンアウト交換で、送信と受信の両方です。

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})
于 2012-05-16T15:14:13.767 に答える
58

最後のいくつかの答えはほぼ正しいです-私はメッセージを生成するアプリをたくさん持っているので、プロセスは非常に簡単です。

複数のコンシューマーに同じメッセージを送信する場合は、次の手順を実行します。

メッセージを受信するアプリごとに1つずつ、複数のキューを作成します。各キューのプロパティで、ルーティングタグをamq.direct交換に「バインド」します。公開アプリを変更してamq.directに送信し、ルーティングタグ(キューではない)を使用します。次に、AMQPは、同じバインディングを使用してメッセージを各キューにコピーします。魅力のように機能します:)

例:生成したJSON文字列があり、ルーティングタグ「new-sales-order」を使用して「amq.direct」エクスチェンジに公開するとします。order_printerアプリのキューがあり、注文を出力します。注文のコピーを送信してクライアントに請求する請求システムのキューと、履歴/コンプライアンスの理由で注文をアーカイブするWebアーカイブシステムがあり、他の情報が入ってくると注文が追跡されるクライアントWebインターフェイスがありますオーダー。

つまり、私のキューは次のとおりです。order_printer、order_billing、order_archive、order_trackingすべてにバインドタグ「new-sales-order」がバインドされており、4つすべてがJSONデータを取得します。

これは、公開アプリが受信アプリを知らない、または気にせずにデータを送信するための理想的な方法です。

于 2015-09-14T01:17:02.517 に答える
37

ただrabbitmqチュートリアルを読んでください。キューではなく、交換するためにメッセージを公開します。その後、適切なキューにルーティングされます。あなたの場合、コンシューマーごとに別々のキューをバインドする必要があります。そうすれば、メッセージを完全に独立して消費できます。

于 2012-05-16T15:11:01.927 に答える
8

はい、各コンシューマーは同じメッセージを受信できます。http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmqを ご覧ください。 com / tutorials / tutorial-five-python.html

メッセージをルーティングするさまざまな方法。私はそれらがpythonとjava用であることを知っていますが、原則を理解し、何をしているのかを決定し、JSでそれを行う方法を見つけるのは良いことです。単純なファンアウト(チュートリアル3 )を実行したいようです。このファンアウトは、交換に接続されているすべてのキューにメッセージを送信します。

あなたがしていることとあなたがしたいこととの違いは、基本的にあなたがセットアップして交換するか、ファンアウトをタイプすることです。ファンアウトエクスカンジは、接続されているすべてのキューにすべてのメッセージを送信します。各キューには、すべてのメッセージに個別にアクセスできるコンシューマーがあります。

はい、これは一般的に行われています。これはAMPQの機能の1つです。

于 2012-05-16T15:08:40.340 に答える
7

送信パターンは1対1の関係です。複数のレシーバーに「送信」する場合は、pub/subパターンを使用する必要があります。詳細については、 http://www.rabbitmq.com/tutorials/tutorial-three-python.htmlを参照してください。

于 2012-05-16T15:09:31.293 に答える
3

RabbitMQ / AMQP:単一のキュー、同じメッセージとページの更新のための複数のコンシューマー。

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });
于 2014-05-14T10:39:15.443 に答える
2

私があなたのケースを評価すると、次のようになります。

  • メッセージのキューがあります(メッセージを受信するためのソース、q111という名前を付けましょう)

  • 私には複数の消費者がいますが、同じメッセージでさまざまなことをしたいと思っています。

ここでの問題は、このキューで3つのメッセージが受信されている間、メッセージ1はコンシューマーAによって消費され、他のコンシューマーBおよびCはメッセージ2および3を消費します。これら3つのメッセージ(1,2,3)はすべて、接続されている3つのコンシューマー(A、B、C)すべてに同時に送信されます。

これを実現するために多くの構成を行うことができますが、簡単な方法は、次の2つのステップの概念を使用することです。

  • 動的なrabbitmq-shovelを使用して、目的のキュー(q111)からメッセージをピックアップし、ファンアウトエクスチェンジ(この目的専用に作成および専用化されたエクスチェンジ)に公開します。
  • 次に、コンシューマーA、B、およびC(queue(q111)をリッスンしていた)を再構成して、各コンシューマーの排他的で匿名のキューを使用して、このファンアウト交換から直接リッスンします。

注:この概念を使用している間は、ソースキュー(q111)から直接消費しないでください。すでに消費されたメッセージは、ファンアウト交換にシャベルされません。

これがあなたの正確な要件を満たさないと思うなら...あなたの提案を投稿してください:-)

于 2014-08-12T06:59:08.627 に答える
2

ファンアウトエクスチェンジャーを使用してメッセージを送信することを確認する必要があると思います。そうすれば、異なるコンシューマーに対して同じメッセージを受信できます。テーブルの下で、RabbitMQは、この新しいコンシューマー/サブスクライバーごとに異なるキューを作成しています。

これは、javascripthttps://www.rabbitmq.com/tutorials/tutorial-one-javascript.htmlのチュートリアル例を参照するためのリンクです

于 2016-11-04T18:46:19.557 に答える
1

必要な動作を得るには、各コンシューマーに独自のキューから消費させるだけです。すべてのキューにメッセージを一度に送信するには、非直接交換タイプ(トピック、ヘッダー、ファンアウト)を使用する必要があります。

于 2012-11-01T20:11:57.867 に答える
1

私のようにamqplibライブラリを使用している場合は、 RabbitMQチュートリアルのパブリッシュ/サブスクライブの実装の便利な例があります。

于 2015-03-02T21:46:55.423 に答える
1

このシナリオには、ここでの回答では見つけられなかった興味深いオプションが1つあります。

あるコンシューマーで「再キューイング」機能を使用してメッセージをナックし、別のコンシューマーでメッセージを処理できます。一般的に言って、それは正しい方法ではありませんが、誰かにとっては十分かもしれません。

https://www.rabbitmq.com/nack.html

そして、ループに注意してください(すべてのコンキュマーがメッセージをナック+リキューするとき)!

于 2018-12-13T12:26:08.767 に答える
1

ファンアウトは明らかにあなたが望んでいたものでした。fanout

rabbitMQチュートリアルを読む: https ://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

これが私の例です:

Publisher.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create exchange for queues
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
    } catch(error) {
      console.error(error)
    }
})

Subscriber.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create/Bind a consumer queue for an exchange broker
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      const queue = await channel.assertQueue('', {exclusive: true})
      channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')

      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
      channel.consume('', consumeMessage, {noAck: true});
    } catch(error) {
      console.error(error)
    }
});

これが私がインターネットで見つけた例です。多分また助けることができます。 https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange

于 2020-09-06T12:29:21.893 に答える