49

Python、Pika、RabbitMQ で遅延 (またはパーキング) キューを作成する最も簡単な方法は何ですか? 同様の質問を見たことがありますが、Python についてはありません。

再度キューに入れる必要があるメッセージを抑制することができるため、アプリケーションを設計するときにこれは便利なアイデアだと思います。

処理しきれないほど多くのメッセージを受信する可能性は常にあります。HTTP サーバーが遅いか、データベースに負荷がかかりすぎている可能性があります。

また、メッセージを失うことをまったく許容できないシナリオで何か問題が発生したときに、処理できなかったメッセージを再キューイングすることで問題が解決する可能性がある場合にも、非常に役立つことがわかりました。また、メッセージが何度もキューに入れられるという問題が発生する可能性もあります。潜在的にパフォーマンスの問題を引き起こし、スパムをログに記録します。

4

6 に答える 6

99

これは、アプリケーションを開発するときに非常に便利であることがわかりました。メッセージを単に再キューイングする代わりの方法を提供するためです。これにより、コードの複雑さを簡単に軽減でき、RabbitMQ の多くの強力な隠し機能の 1 つです。

手順

最初に、メイン キュー用と遅延キュー用の 2 つの基本チャネルを設定する必要があります。最後の例では、必須ではありませんがコードの信頼性を高めるフラグをいくつか追加しています。confirm deliverydelivery_modeおよびなどdurable。これらの詳細については、RabbitMQ のマニュアルを参照してください。

チャネルを設定したら、遅延チャネルからメイン キューにメッセージを送信するために使用できるバインディングをメイン チャネルに追加します。

channel.queue_bind(exchange='amq.direct',
                   queue='hello')

次に、期限切れになったメッセージをメイン キューに転送するように遅延チャネルを構成する必要があります。

delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000,
  'x-dead-letter-exchange' : 'amq.direct',
  'x-dead-letter-routing-key' : 'hello'
})
  • x-message-ttl (メッセージ - 生存時間)

    これは通常、特定の期間が経過した後にキュー内の古いメッセージを自動的に削除するために使用されますが、オプションの引数を 2 つ追加することで、この動作を変更できます。代わりに、このパラメーターで、メッセージが遅延キューに留まる時間をミリ秒単位で決定できます。

  • x-dead-letter-routing-key

    この変数を使用すると、メッセージを完全に削除するデフォルトの動作ではなく、有効期限が切れたメッセージを別のキューに転送できます。

  • x-dead-letter-exchange

    この変数は、メッセージを hello_delay から hello キューに転送するために使用された Exchange を決定します。

遅延キューへの発行

すべての基本的な Pika パラメーターの設定が完了したら、基本的なパブリッシュを使用して遅延キューにメッセージを送信するだけです。

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

スクリプトを実行すると、RabbitMQ 管理モジュールに作成された次のキューが表示されます。 ここに画像の説明を入力

例。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer 
# messages from our delay queue.
channel.queue_bind(exchange='amq.direct',
                   queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_delivery()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue_declare(queue='hello_delay', durable=True,  arguments={
  'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
  'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
  'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic_publish(exchange='',
                      routing_key='hello_delay',
                      body="test",
                      properties=pika.BasicProperties(delivery_mode=2))

print " [x] Sent"
于 2013-06-09T21:58:47.780 に答える
17

RabbitMQ 公式プラグインx-delayed-messageを使用できます。

まず、ez ファイルをダウンロードしてYour_rabbitmq_root_path/pluginsにコピーします。

次に、プラグインを有効にします (サーバーを再起動する必要はありません)。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

最後に、次のような「x-delay」ヘッダーを使用してメッセージを公開します。

headers.put("x-delay", 5000);

知らせ:

メッセージの安全性を保証するものではありません。これは、rabbitmq サーバーのダウンタイム中にメッセージの有効期限が切れた場合、残念ながらメッセージが失われるためです。したがって、このスキームを使用するときは注意してください。

詳細はrabbitmq-delayed-message-exchangeでお楽しみください

于 2015-11-03T05:24:24.207 に答える
8

参考までに、Spring 3.2.x でこれを行う方法。

<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/>

<rabbit:queue-arguments id="delayQueueArguments">
  <entry key="x-message-ttl">
    <value type="java.lang.Long">10000</value>
  </entry>
  <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
  <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
</rabbit:queue-arguments>


<rabbit:fanout-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:fanout-exchange>
于 2014-06-09T21:22:28.097 に答える