0

簡単な例として、新しい RabbitMQ(v 2.6.1) キューに 5 つのアイテムを追加します。

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
# add 5 messages to the queue, the numbers 1-5
for x in range(5):
    message = x+1
    channel.basic_publish(exchange='',routing_key='dw.neil', body=str(message))
    print " [x] Sent '%s'" % message
connection.close()

キューを消去してから、上記のコードを実行して 5 つのアイテムを追加します。

nkodner@hadoop4 sports_load_v2$ python send_5.py 
 [x] Sent '1'
 [x] Sent '2'
 [x] Sent '3'
 [x] Sent '4'
 [x] Sent '5'

今、失敗した処理をシミュレートしようとしています。キューから消費する次のコードが与えられます。basic_ack への呼び出しがコメントアウトされていることに注意してください。

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
method_frame, header_frame, body=channel.basic_get(queue='dw.neil')
print method_frame, header_frame
print "body: %s" % body
#channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()

受信コードを実行して、キューからアイテムを取得します。予想どおり、アイテム #1 を取得します。

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

channel.basic_ack() への呼び出しはコメントアウトされているため、未確認のメッセージがキューに配置され、次のコンシューマーがそれを取得できるようになると思います。Redelivered プロパティが True に設定された状態で、メッセージ #1 が (再び) キューから出された最初のメッセージであることを願っています。代わりに、メッセージ #2​​ が受信されます。

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 2

また、#1 が再配信フラグを True に設定して戻ってくる前に、キュー内の他のすべてのメッセージが受信されます。

...

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 5

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=True', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

確認されるまで #1 を配信し続けるように設定できるプロパティまたはオプションはありますか?

私のユースケースは、順次生成されたファイルをデータ ウェアハウスにロードすることです。メッセージベースの処理を使用して、いくつかの新しいファイルの準備ができており、DW にロードされることをプログラムに知らせています。ファイルは、生成された順に処理する必要があります。

4

2 に答える 2

3

これは、RabbitMQ 2.7.0 で対処されています - 私たちは 2.6.1 を実行していました。

リリースノートから:

このリリースの新機能は次のとおりです。

  • コンシューマ用に再キューイングされたメッセージの順序が保持されます
于 2012-06-22T20:32:58.647 に答える
1

channel.basic_reject を使用してみてください。これにより、未確認のメッセージが RabbitMQ にプッシュされ、メッセージが新しいメッセージとして扱われます。また、失敗したメッセージがスタックしている場合は、 channel.basic_recover を使用して、確認応答されていないすべてのメッセージを再配信するように RabbitMQ に指示できます。

http://www.rabbitmq.com/extensions.html#negative-acknowledgementsは、Basic.Reject と Basic.Nack に関する識別情報を提供します。

メッセージ順序のセマンティクスについては、http: //www.rabbitmq.com/semantics.html で説明されています。

于 2012-05-14T16:01:52.617 に答える