3

私は、呼び出されたときに、ねじれたリアクター内のいくつかの並列コードの実行を単純に開始するセロリタスクを持っています。説明するためのサンプル (実行可能ではない) コードを次に示します。

def run_task_in_reactor():
   # this takes a while to run
   do_something()
   do_something_more()


@celery.task
def run_task():
   print "Started reactor"
   reactor.callFromThread(run_task_in_reactor)

(簡単にするために、タスクがワーカーによって受信されたときにリアクターが既に実行されていると仮定してください。ワーカー@worker_process_init.connectが起動するとすぐにシグナルを使用して、別のスレッドでリアクターを開始しました)

を呼び出すrun_task.delay()と、タスクはすぐに終了します (終了を待たずrun_task_in_reactor()に、リアクターで実行をスケジュールするだけなので)。そして、run_task_in_reactor()最終的に実行されると、do_something()またはdo_something_more()例外がスローされる可能性がありますが、これは見過ごされます。

pikaたとえば、キュ​​ーから消費するために使用して、内部で ACK を使用しdo_something_more()て、ワーカーにタスクの正しい完了を通知させることができます。ただし、セロリ内では、これは不可能のようです (または、少なくとも、同じ効果を達成する方法がわかりません)。

また、使用しているサードパーティ コードの要件であるため、リアクターを削除できません。同じ結果を達成する他の方法も高く評価されます。

4

1 に答える 1