9

ルイージを使用してパイプラインを起動しています。簡単な例を見てみましょう

task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()

myTaskここで、実行中に例外が発生したとしましょう。私が持つことができるのは、例外を示す luigi からのログだけです。

ルイージがそれを伝播したり、少なくともfailureステータスを返す方法はありますか?

その後、その状態に応じてプログラムを反応させることができます。

ありがとう。

EDIT 結果を保存するときに、ルイージの出力がデータベースをターゲットにしていることを指定するのを忘れました。例外が発生した場合、結果は保存されませんが、例外は luigi に伝播されません。ルイージにこれを持つオプションがあるかどうか疑問に思っていました。

4

2 に答える 2

21

ドキュメントから:

Luigi には組み込みのイベント システムがあり、イベントへのコールバックを登録して、独自のタスクからそれらをトリガーできます。定義済みのイベントにフックすることも、独自のイベントを作成することもできます。各イベント ハンドルは Task クラスに関連付けられており、そのクラスまたはそのサブクラスからのみトリガーされます。これにより、特定のクラス (hadoop ジョブなど) からのイベントのみを簡単にサブスクライブできます。

例:

import luigi

from my_tasks import MyTask


@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """

    do_something()


luigi.run()

ルイージにはたくさんのイベントがありますこのテストを見て、他のイベントをリッスンして反応する方法を学ぶこともできます。

于 2015-10-28T16:37:49.973 に答える
-1

あなたができることは、エラーをファイルに書き込むことです。たとえば、失敗する可能性のあるタスク (TaskA と呼びましょう) では、次のようになります。

x=""
try:
    do stuff
except:
    x="error!"
with open('errorfile.log','w') as f:
    f.write(x)

次に、そのエラーに依存するタスクで、そのタスクは TaskA を必要とします。そして、次のようなことができます:

with open('errorfile.log','r') as f:
    if f.read()://if anything is in the error log from TaskA
        //error occurred
        do stuff
    else:
        do other stuff
于 2015-09-29T15:49:13.347 に答える