私は、呼び出されたときに、ねじれたリアクター内のいくつかの並列コードの実行を単純に開始するセロリタスクを持っています。説明するためのサンプル (実行可能ではない) コードを次に示します。
def run_task_in_reactor():
# this takes a while to run
do_something()
do_something_more()
@celery.task
def run_task():
print "Started reactor"
reactor.callFromThread(run_task_in_reactor)
(簡単にするために、タスクがワーカーによって受信されたときにリアクターが既に実行されていると仮定してください。ワーカー@worker_process_init.connect
が起動するとすぐにシグナルを使用して、別のスレッドでリアクターを開始しました)
を呼び出すrun_task.delay()
と、タスクはすぐに終了します (終了を待たずrun_task_in_reactor()
に、リアクターで実行をスケジュールするだけなので)。そして、run_task_in_reactor()
最終的に実行されると、do_something()
またはdo_something_more()
例外がスローされる可能性がありますが、これは見過ごされます。
pika
たとえば、キューから消費するために使用して、内部で ACK を使用しdo_something_more()
て、ワーカーにタスクの正しい完了を通知させることができます。ただし、セロリ内では、これは不可能のようです (または、少なくとも、同じ効果を達成する方法がわかりません)。
また、使用しているサードパーティ コードの要件であるため、リアクターを削除できません。同じ結果を達成する他の方法も高く評価されます。