1

私は AMQP を学んでいるので、これはおそらく私がやっていることの誤解です。システムのファームにワークアウトを提供するために、自分のプライベートサーバーにrabbitmqをセットアップしようとしています(処理する必要がある画像がたくさんあります)。3 ステップのパイプラインをセットアップしました。

| put image on queue | --work_queue--> | process | --results--> | archive results | 

rabbitmq をインストールし、サーバーに 2 つのキューを作成しました。「work_queue」と「results」。amqplib を使用していくつかの Python スクリプトを作成しました。パイプラインは 1 つのイメージ プロセッサ ワーカーで問題なく動作しています。キューに 100 個の画像を追加しましたが、1 台のマシンが喜んで一度に 1 つずつ取得し、データをかき回して、結果を結果キューに入れています。

問題は、別のマシンで別のイメージ プロセスを開始すると、キューから作業が取り出されるだけだと思っていたことです。これは、rabbitmq サイトの「ワーク キュー」チュートリアルにリストされている正確なケースのようです。私はこれがうまくいくと思っていました。しかし、実際には、work_queue で待機しているメッセージがたくさんあるにもかかわらず、他のワーカーをいくつ起動しても、それらのワーカーは永遠に待機し、まったく仕事を取得しません。

私は何を誤解しましたか?サーバーで作業をキューに入れる関連コード:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
    password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()

....

msg = { 'filename': os.path.basename(i) }

chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
        routing_key='work_queue')

そして、プロセスワーカーのコンシューマー側:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
    password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()

def work_callback(msg):
.... 

while True:
    chan.basic_consume(callback=work_callback, queue='work_queue')
    try:
        chan.wait()
    except KeyboardInterrupt:
        print "\nExiting cleanly"
        sys.exit(0)

他のワーカーが接続されていることがわかります。

$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue      246
...done.

$ sudo rabbitmqctl list_connections 
Listing connections ...
pipeline        192.168.8.1     41553   running
pipeline        XX.YY.ZZ.WW     46676   running
pipeline        192.168.8.4     44482   running
pipeline        192.168.8.6     41884   running
...done.

ここで、XX.YY.ZZ.WW は外部 IP です。192.168.8.6 のワーカーはキューで大量に処理されますが、外部 IP のワーカーは待機している間、246 個のメッセージが待機しているはずのキューで待機しています。

考え?

4

1 に答える 1

0

自分の質問に答えて申し訳ありませんが、ようやく探していた動作が得られました。QoS prefetch_count を 1 に設定する必要がありました。どうやら、prefetch_count を設定しないことで、1 つのクライアントが接続するとすぐに、200 以上のメッセージすべてがその「チャネル」に配信され、キューから排出されたようです。元は切断されていました (したがって、チャネルを閉じて、メッセージをキューに戻します。

追加することにより:

chan.basic_qos(0,1,False)

私のワーカーにとっては、一度に 1 つのメッセージしか取得できないようになりました。まさに私が期待していたものではありませんでしたが、今では意図したとおりに機能しています。

于 2013-08-31T01:09:27.993 に答える