私は AMQP を学んでいるので、これはおそらく私がやっていることの誤解です。システムのファームにワークアウトを提供するために、自分のプライベートサーバーにrabbitmqをセットアップしようとしています(処理する必要がある画像がたくさんあります)。3 ステップのパイプラインをセットアップしました。
| put image on queue | --work_queue--> | process | --results--> | archive results |
rabbitmq をインストールし、サーバーに 2 つのキューを作成しました。「work_queue」と「results」。amqplib を使用していくつかの Python スクリプトを作成しました。パイプラインは 1 つのイメージ プロセッサ ワーカーで問題なく動作しています。キューに 100 個の画像を追加しましたが、1 台のマシンが喜んで一度に 1 つずつ取得し、データをかき回して、結果を結果キューに入れています。
問題は、別のマシンで別のイメージ プロセスを開始すると、キューから作業が取り出されるだけだと思っていたことです。これは、rabbitmq サイトの「ワーク キュー」チュートリアルにリストされている正確なケースのようです。私はこれがうまくいくと思っていました。しかし、実際には、work_queue で待機しているメッセージがたくさんあるにもかかわらず、他のワーカーをいくつ起動しても、それらのワーカーは永遠に待機し、まったく仕事を取得しません。
私は何を誤解しましたか?サーバーで作業をキューに入れる関連コード:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
....
msg = { 'filename': os.path.basename(i) }
chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
routing_key='work_queue')
そして、プロセスワーカーのコンシューマー側:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
def work_callback(msg):
....
while True:
chan.basic_consume(callback=work_callback, queue='work_queue')
try:
chan.wait()
except KeyboardInterrupt:
print "\nExiting cleanly"
sys.exit(0)
他のワーカーが接続されていることがわかります。
$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue 246
...done.
$ sudo rabbitmqctl list_connections
Listing connections ...
pipeline 192.168.8.1 41553 running
pipeline XX.YY.ZZ.WW 46676 running
pipeline 192.168.8.4 44482 running
pipeline 192.168.8.6 41884 running
...done.
ここで、XX.YY.ZZ.WW は外部 IP です。192.168.8.6 のワーカーはキューで大量に処理されますが、外部 IP のワーカーは待機している間、246 個のメッセージが待機しているはずのキューで待機しています。
考え?