0

最近、プロジェクトの 1 つに統合する必要があるため、セロリ昆布のドキュメントを調べています。これがどのように機能するかについての基本的な理解はありますが、さまざまなブローカーを使用したドキュメントの例では混乱しています。

シナリオは次のとおりです。

私のアプリケーション内には 2 つのビューがViewAあり、ViewBどちらも高価な処理を行うため、処理にセロリ タスクを使用する必要がありました。これが私がしたことです。

ビュー.py

def ViewA(request):
    tasks.do_task_a.apply_async(args=[a, b])


def ViewB(request):
    tasks.do_task_b.apply_async(args=[a, b])

タスク.py

@task()
def do_task_a(a, b):
    # Do something Expensive

@task()
def do_task_b(a, b):
    # Do something Expensive here too

これまでのところ、すべてが正常に機能しています。問題は、システム上にファイルをdo_task_a作成することです。今、メソッドでファイルの存在を確認し、ファイルが存在しない場合はタスクメソッドを呼び出すことができます[これは私が今行っていることです]。txtdo_task_bdo_task_bretry

ここでは、むしろ別のアプローチを取りたいと思います (つまり、メッセージングの出番です)。ファイルが作成されるまで再試行メソッドをループするのではなく、ファイルが作成されたらdo_task_aメッセージを送信したいと思います。do_task_b

のドキュメントを読み、celerykombuのように設定を更新しました。

BROKER_URL = "django://"
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = "sqlite:///celery"
TASK_RETRY_DELAY = 30 #Define Time in Seconds
DATABASE_ROUTERS = ['portal.db_routers.CeleryRouter']
CELERY_QUEUES = ( 
    Queue('filecreation', exchange=exchanges.genex, routing_key='file.create'),
)
CELERY_ROUTES = ('celeryconf.routers.CeleryTaskRouter',)

そして私はここで立ち往生しています。ここからどこへ行けばいいのかわからない。

ファイル作成時にdo_task_aメッセージをブロードキャストするには、次に何をすればよいですか? メッセージを受信 (消費) し、コードをさらに処理するdo_task_bにはどうすればよいですか??do_task_b

アイデアや提案は大歓迎です。

4

1 に答える 1

1

これは、Celeryのコールバック/リンク関数を使用するための良い例です。

Celeryは、あるタスクが別のタスクに続くように、タスクをリンクすることをサポートしています。あなたはここでそれについてもっと読むことができます

apply_async()関数には2つのオプションの引数があります

+link : excute the linked function on success 
+link_error : excute the linked function on an error

@task
def add(a, b):
    return a + b

@task
def total(numbers):
    return sum(numbers)

@task
def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task %r raised exception: %r\n%r' % (exc, result.traceback))

今、あなたの呼び出し関数で次のようなことをしてください

def main():
    #for error_handling
    add.apply_async((2, 2), link_error=error_handler.subtask())

    #for linking 2 tasks
    add.apply_async((2, 2), link=add.subtask((8, )))
    # output 12

    #what you can do is your case is something like this. 
    if user_requires:
        add.apply_async((2, 2), link=add.subtask((8, )))
    else:
        add.apply_async((2, 2))

これがお役に立てば幸いです

于 2013-01-04T10:50:36.750 に答える