3

Golang (redis を使用) から書いたアプリケーションを Python に移植する作業を行っています。Celery を使用してタスクのキューイングを実行したいと思っていますが、ルーティングに関して質問があります...

私のアプリケーションは、REST POST を介して「イベント」を受け取ります。各「イベント」は異なるタイプにすることができます。次に、バックグラウンドでワーカーに特定のタイプのイベントを待機させたいと考えています。ここでの注意点は、1 つのイベントが、そのイベントを処理する複数のタスクになる可能性があることです。例えば:

some/lib/a/tasks.py

@task
def handle_event_typeA(a,b,c):
    # handles event...
    pass

@task
def handle_event_typeB(a,b,c):
    # handles other event...
    pass

some/lib/b/tasks.py

@task
def handle_event_typeA(a,b,c):
    # handles event slightly differently... but still same event...
    pass

要約すると... N 個のワーカーを (X 個のマシンで) 実行できるようにしたいのですが、これらの各作業には、a.handle_event_typeA、b.handle_event_typeA などのように Y 個のタスクが登録されます。 . タスクをキューに挿入し、1 つのワーカーにそのタスクをピックアップさせ、ワーカー内の複数のタスク (つまり、a.handle_event_typeA と b.handle_event_typeA の両方) にルーティングできるようにしたいと考えています。

ここの昆布のドキュメントとここのセロリのルーティングのドキュメントを読みましたが、これを正しく構成する方法がわかりません。

私は以前から、従来のワークフローに Celery を使用してきましたが、その機能セット、パフォーマンス、および安定性に非常に満足しています。Kombuを直接使用するか、自作ソリューションを使用して必要なものを実装しますが、可能であればCeleryを使用したいと思います.

みんなありがとう!この質問で誰かの時間を無駄にしないことを願っています。

編集 1

この問題についてもう少し考えた後、Celery で必要なものを実装するための回避策を考え出しました。これは最も洗練されたソリューションではありませんが、うまく機能しています。私はdjangoを使用しており、キャッシュの抽象化です(代わりにmemcachedやredisなどを直接使用できます)。これが私が思いついたスニペットです:

from django.core.cache import cache
from celery.execute import send_task

SUBSCRIBERS_KEY = 'task_subscribers.{0}'

def subscribe_task(key, task):
    # get current list of subscribers
    cache_key = SUBSCRIBERS_KEY.format(key)
    subscribers = cache.get(cache_key) or []
    # get task name
    if hasattr(task, 'delay'):
        name = task.name
    else:
        name = task
    # add to list
    if not name in subscribers:
        subscribers.append(name)
    # set cache
    cache.set(cache_key, subscribers)

def publish_task(key, *kargs):
    # get current list of subscribers
    cache_key = SUBSCRIBERS_KEY.format(key)
    subscribers = cache.get(cache_key) or []
    # iterate through all subscribers and execute task
    for task in subscribers:
        # send celery task
        send_task(task, args=kargs, kwargs={})

次に、次のようにして、さまざまなモジュールのタスクをサブスクライブします。

subscribe_task('typeA', 'some.lib.b.tasks.handle_event_typeA')

次に、REST イベントを処理するときにパブリッシュ タスク メソッドを呼び出すことができます。

4

0 に答える 0