これは、アプリケーションを開発するときに非常に便利であることがわかりました。メッセージを単に再キューイングする代わりの方法を提供するためです。これにより、コードの複雑さを簡単に軽減でき、RabbitMQ の多くの強力な隠し機能の 1 つです。
手順
最初に、メイン キュー用と遅延キュー用の 2 つの基本チャネルを設定する必要があります。最後の例では、必須ではありませんがコードの信頼性を高めるフラグをいくつか追加しています。confirm delivery
、delivery_mode
およびなどdurable
。これらの詳細については、RabbitMQ のマニュアルを参照してください。
チャネルを設定したら、遅延チャネルからメイン キューにメッセージを送信するために使用できるバインディングをメイン チャネルに追加します。
channel.queue_bind(exchange='amq.direct',
queue='hello')
次に、期限切れになったメッセージをメイン キューに転送するように遅延チャネルを構成する必要があります。
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 5000,
'x-dead-letter-exchange' : 'amq.direct',
'x-dead-letter-routing-key' : 'hello'
})
x-message-ttl (メッセージ - 生存時間)
これは通常、特定の期間が経過した後にキュー内の古いメッセージを自動的に削除するために使用されますが、オプションの引数を 2 つ追加することで、この動作を変更できます。代わりに、このパラメーターで、メッセージが遅延キューに留まる時間をミリ秒単位で決定できます。
x-dead-letter-routing-key
この変数を使用すると、メッセージを完全に削除するデフォルトの動作ではなく、有効期限が切れたメッセージを別のキューに転送できます。
x-dead-letter-exchange
この変数は、メッセージを hello_delay から hello キューに転送するために使用された Exchange を決定します。
遅延キューへの発行
すべての基本的な Pika パラメーターの設定が完了したら、基本的なパブリッシュを使用して遅延キューにメッセージを送信するだけです。
delay_channel.basic_publish(exchange='',
routing_key='hello_delay',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
スクリプトを実行すると、RabbitMQ 管理モジュールに作成された次のキューが表示されます。
例。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue_bind(exchange='amq.direct',
queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_delivery()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic_publish(exchange='',
routing_key='hello_delay',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
print " [x] Sent"