5

私は最近、特定のキーを持つメッセージが常に同じワーカーにルーティングされるという概念(Celery の概念とは無関係)を含むStormを使用しています。fields groupinggroup()

私が意味することのより明確な定義を得るために、これはStorm wikiからのものです.

フィールドのグループ化: ストリームは、グループ化で指定されたフィールドによって分割されます。たとえば、ストリームが「user-id」フィールドによってグループ化されている場合、同じ「user-id」を持つタプルは常に同じタスクに送られますが、異なる「user-id」を持つタプルは異なるタスクに送られる可能性があります.

たとえば、単語リストから読み取ると、a、b、c で​​始まる単語をワーカー プロセスのみにルーティングし、d、e、f を別のプロセスにルーティングしたいと考えています。

これが必要な理由は、プロセス間で競合状態が発生しないように、1 つのプロセスが同じデータのセットに対するデータベースの読み取り/書き込みを担当するようにしたいからかもしれません。

Celery 内でこれを達成するための最良の方法を考え出そうとしています。

これまでの私の最善の解決策は、「グループ」ごとにキューを使用し (例えば、letter.a、letters.d)、ワーカー プロセスの数がキューの数と正確に一致するようにすることです。欠点は、ワーカーが死亡したときやワーカーが追加/削除されたときなどのさまざまなシナリオとともに、ワーカーごとに 1 つのプロセスのみを実行する必要があることです。

Celery は初めてなので、参照している概念が間違っている場合は修正してください。

4

2 に答える 2

6

少し接着剤が含まれていますが、コンセプトは次のとおりです。

を使用して、タスクを別のワーカーに直接送信する方法がありますCELERY_WORKER_DIRECT。に設定するとTrue、各ワーカーへのルートが作成されます。

私は、アクティブなホストを使用するか、アクティブなホストを決定して、アクティブなワーカーを定期的に決定celery.current_app.control.inspect().ping()します。例えば:

>>> hosts = sorted(celery.current_app.control.inspect().ping().keys())
['host5', 'host6']

キーでルーティングする必要がある場合は、値をハッシュしてからワーカー数でモジュロします。これにより、タスクが均等に分散され、同じキーが同じワーカーに渡されます。例えば:

>>> host_id = hash('hello') % len(hosts)
1
>>> host = hosts[host_id]
'host6'

次に、タスクを実行するときに、次のように交換キーとルーティング キーを指定するだけです。

my_task.apply_async(exchange='C.dq', routing_key=host)

いくつかの欠点があります。

  1. 私が見る限り、ワーカーに > 1 の同時実行を設定すると、各プロセスが同じものを消費し、この演習全体が無効になります。不幸な修正は、それを 1 のままにしておくことです。
  2. ping()と の間でワーカーがダウンした場合apply_async、メッセージは存在しないルートに送信されます。これを修正するには、タイムアウトをキャッチし、使用可能なホストを再アサートし、再ハッシュして再送信します。
于 2013-11-11T11:09:02.717 に答える