59

RabbitMQ の Java API docsを読んだところ、非常に有益でわかりやすいものでした。Channelシンプルなパブリッシュ/コンシュームをセットアップする方法の例は、非常に理解しやすく理解するのが簡単です。しかし、これは非常に単純で基本的な例であり、重要な疑問が残りました。複数のキューとの間で発行/消費するように1+ を設定するにはどうすればよいですか?Channels

と の 3 つのキューを持つ RabbitMQ サーバーがあるとloggingsecurity_eventsますcustomer_orders。そのため、3 つのキューすべてに発行/消費する機能を1 つ持つ必要があります。または、それぞれが 1 つのキュー専用のChannel3 つの個別の を持つ可能性が高くなります。Channels

これに加えて、RabbitMQ のベスト プラクティスでは、Channelコンシューマー スレッドごとに 1 つを設定する必要があります。この例では、security_eventsコンシューマ スレッドが 1 つだけで問題ないとしますがloggingcustomer_order両方ともボリュームを処理するために 5 つのスレッドが必要です。したがって、私が正しく理解している場合、それは次のものが必要であることを意味しますか?

  • との間でパブリッシュ/コンシュームするための1Channelおよび 1 つのコンシューマ スレッドsecurity_events。と
  • との間でパブリッシュ/コンシュームするための5Channelsおよび 5 つのコンシューマー スレッドlogging。と
  • との間で公開/消費するための5 つChannelsおよび 5 つのコンシューマ スレッドcustomer_orders

ここで私の理解が間違っている場合は、まず訂正してください。いずれにせよ、戦いに疲れたRabbitMQのベテランが、ここで私の要件を満たすパブリッシャー/コンシューマーを設定するための適切なコード例で「点をつなぐ」のを手伝ってくれるでしょうか? 前もって感謝します!

4

2 に答える 2

138
于 2013-09-09T15:09:04.870 に答える
25

複数のキューとの間でパブリッシュ/コンシュームするように 1 つ以上のチャネルを設定するにはどうすればよいですか?

スレッドとチャネルを使用して実装できます。必要なのは、物事を分類する方法です。つまり、ログインからのすべてのキュー アイテム、security_events からのすべてのキュー要素などです。分類は、routingKey を使用して達成できます。

つまり、アイテムをキューに追加するたびに、ルーティング キーを指定します。プロパティ要素として追加されます。これにより、 loggingなどの特定のイベントから値を取得できます。

次のコード サンプルは、クライアント側で実行する方法を説明しています。

例えば:

ルーティング キーは、チャネルのタイプを識別し、タイプを取得するために使用されます。

たとえば、タイプ Login に関するすべてのチャネルを取得する必要がある場合は、ルーティング キーを login またはそれを識別する他のキーワードとして指定する必要があります。

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            string routingKey="login";

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

分類の詳細については、こちらを参照してください。


ねじ部

パブリッシング部分が終わったら、スレッド部分を実行できます..

この部分では、カテゴリに基づいて公開データを取得できます。すなわち; あなたの場合、ロギング、security_events、customer_ordersなどのルーティングキー。

スレッドでデータを取得する方法については、例を参照してください。

例:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
//**The threads part is as follows** 
 channel.exchangeDeclare(EXCHANGE_NAME, "direct");      
 String queueName = channel.queueDeclare().getQueue();
    // This part will biend the queue with the severity (login for eg:)
    for(String severity : argv){
              channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
    }
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, "myConsumerTag",
    new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
         throws IOException
     {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();

             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
     }
 });

これで、ログイン (ルーティング キー) タイプのキュー内のデータを処理するスレッドが作成されました。これにより、複数のスレッドを作成できます。それぞれ異なる目的を果たします。

スレッド部分の詳細については、こちらをご覧ください。

于 2013-09-09T15:06:57.530 に答える