2

セロリを使った分散ジョブ実行システムを構築しようとしています。

1台のマシン(localhost)で2つのワーカーを起動し、1つは加算タスク用、もう1つはadd減算タスク用で、いくつかの加算タスクを開始subするために使用add.delay()すると、減算ワーカーの端末にエラーが発生します。

[2013-03-05 15:51:18,898: ERROR/MainProcess] Received unregistered task of type 'add_tasks.add'.

このテストでは、2つの加算タスクを開始しました。1つは加算ワーカーによってキャッチされ、もう1つは減算ワーカーによってキャッチされたため、上記のエラーが発生しました。2番目の加算タスクが減算ワーカーによってキャッチされないように構成を変更するにはどうすればよいですか?ありがとう。

コードは次のとおりです。

add_tasks.py:

celery = Celery('add_tasks', backend='amqp', broker='amqp://guest@localhost//')

@celery.task
def add(x, y):
    sleep(20)
    return x + y

sub_tasks.py:

celery = Celery('sub_tasks', backend='amqp', broker='amqp://guest@localhost//')

@celery.task
def sub(x, y):
    sleep(10)
    return x - y

ローカルホストマシンの2つのターミナルでワーカーを起動しましたcelery -A add_tasks worker --loglevel=info -n worker1celery -A sub_tasks worker --loglevel=info -n worker2

4

1 に答える 1

4

最後に、このROUTER機能で問題を解決できることがわかりました。私は自分の解決策をここに置き、同じ問題を抱えている他の人にも役立つことを願っています。

ワーカーを起動するときに、-Q queueオプションを使用して、でタスクのみを受け入れるようにワーカーを制限できますqueue。私の状況では、を使用しcelery -A add_tasks worker --loglevel=info -n worker1 -Q additionました。

一方、新しいタスクを開始するときは、たとえば、とのキュー引数で明示的に示す必要がadd.apply_async(queue='addition',priority=0,args=[1,4])ありsub.apply_async(queue='subtraction',priority=0,args=[1,4])ます。その場合、加算タスクは減算ワーカーによって受け入れられません。

于 2013-03-05T09:52:14.517 に答える