1

次のようにセロリタスクをセットアップしています。

@celeryapp.task
def heavy_task(x, y):
    # some stuff
    for _ in range(10000):
        heavy_task_2.apply_async(args=(x,y),
                                 countdown=random.randint(60,120))
    return x+y

@celeryapp.task
def heavy_task_2(x, y):
    # some stuff
    return x+y

それぞれ 30 の同時実行性を持つ 5 つのワーカー (プリフォーク) があります。すべて prefetch_multiplier=1 および -Ofair 引数で実行されています。CELERY_ACKS_LATE=True で redis ブローカーを使用しています

ここで、セロリ ビート スケジュールから Heavy_task(1,2).delay() を呼び出します。タスクは任意の 1 つのワーカーに送られ、作成された 10,000 個のタスクはすべてそのワーカーにのみ存在し、ブローカーには公開されないため、他のワーカーは次のことができます。それらのタスクに取り組みます。ワーカーの prefetch_count は 10,000 まで増加し続けます

元のワーカーのメモリが 90% 近く消費され始めた後にのみ、これらのタスクがブローカーに公開され、他のワーカーに移行されます。ワーカーも OS によって強制終了されることがあるため、私のタスクは redis ブローカーで「未確認」ではないため、永久に失われます。

すべてのタスクで 1 人のワーカーに負担をかけずに、これらの二次タスクをすぐにブローカーに渡すにはどうすればよいですか?

4

0 に答える 0