私はCeleryを使用して、数千のタスクのグループを実行しています。各タスクの実行には数分かかります。以下のコードは、次の単純なドロップイン置換ですmultiprocessing.pool.Pool.map
。
def map(task, data):
"""
Perform the *task* on *data* in distributed way. Blocks until finished.
"""
ret = celery_module.group(task.s(val) for val in data).apply_async()
return ret.get(interval = 0.1)
労働者が決して壊れない限り、これは魅力のように機能します。しかし、ノードがいくつかの実行中のタスクを実行して停止することがあります。その後、他のすべてのタスクが終了し、ワーカーはアイドル状態になりget
ますが、死んだワーカーからの結果を永遠に待ちます。
タイムアウト後にデッドタスクを再試行させる方法は?タスクはべき等であり、重複した実行についてはまったく心配していません。私はCELERY_ACKS_LATE
あちこちでタイムアウトをいじって入れようとしましたが、この状況を改善するものは何もなかったようです。当たり前のことを見逃した気がしますが、なにかわかりません。
編集:ブローカーと結果の両方に使用されるトランスポートはRedisです。