12

Python/Django/Celery ベースの展開ツールの場合、次のセットアップがあります。

  1. 現在、デフォルトの Celery セットアップを使用しています。(「セロリ」と呼ばれる 1 つのキュー + 交換。)
  2. キューの各タスクは、展開操作を表します。
  3. 環境の各タスクは、(非常に) 時間がかかる可能性がある同期フェーズで終了します。

次の仕様を満たす必要があります。

  1. 並行性 : 複数の環境のタスクを同時に実行する必要があります。
  2. ロック:各環境で同時に実行されるタスクは最大で 1 つです (つまり、環境ロック)。
  3. スループットの最適化: 1 つの環境に複数のタスクがある場合、それらの同期フェーズを組み合わせて最適化することができます。そのため、タスクが終了間近になった場合、この環境のキューで待機している新しいタスクがあるかどうかを確認し、ある場合は同期フェーズをスキップする必要があります。

これを実装するための好ましい方法は何ですか?

いくつかの考え:

  • 環境ごとに 1 つずつ、複数のキューを設定する必要があると言えます。また、N 個のセロリ ワーカーがそれぞれ 1 つのキューを排他的に処理する必要があります。(これで仕様 1+2 は解決されます。)
    しかし、複数のセロリ ワーカーが異なるキューを排他的にリッスンするにはどうすればよいでしょうか?
  • 環境のキューで待機しているタスクが他にもあることを知る明確な方法はありますか?
4

2 に答える 2

2

for 1,2 複数のキューを使用し、ワーカーを -Q で起動して、リッスンするキューを指定します。また、CELERYD_PREFETCH_MULTIPLIER = 1 を一度に 1 つのタスクだけに設定します。

キューの長さを取得するには (rabbitmq でテスト済み)、次のようなものを使用できます。

from kombu.connection import BrokerConnection
connection = BrokerConnection(BROKER_HOST, BROKER_USER...)
channel = connection.channel()
q, j, c = channel.queue_declare('celery', passive=True)
print 'celery %d jobs in queue' % j

'queue_delcare' は副作用として、キューの長さを示します。これがあなたを助けることを願っています。

于 2011-04-07T10:13:29.990 に答える
1

1 つの超高速ライブラリでメッセージングとマルチスレッドを実行できるzeromqを見てみましょう。また、多数の言語をサポートし、負荷分散が組み込まれています。

于 2011-04-05T18:43:33.767 に答える