Luigi を使用していくつかのタスクを実行しています。次に、出力を標準化されたファイルの場所に一括転送する必要があります。complete()
これを行うために、オーバーライドされたメソッドを使用して WrapperTask を作成しました。
from luigi.task import flatten
class TaskX(luigi.WrapperTask):
date = luigi.DateParameter()
client = luigi.s3.S3Client()
def requires(self):
yield TaskA(date=self.date)
yield TaskB(date=self.date)
def complete(self):
tasks_complete = all(r.complete() for r in flatten(self.requires()))
## at the end of everything, batch copy the files
if tasks_complete:
self.client.copy('current-old', 'current')
return True
else:
return False
if __name__ == "__main__":
luigi.run()
complete()
しかし、プロセスが実際に終了したときに呼び出される条件部分を取得するのに問題があります。
これは、他の人が指摘した非同期動作のためだと思いますが、修正方法がわかりません。
これらのコマンドライン パラメーターを使用して Luigi を実行してみました。
$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task
しかし、それは正しく機能していないようです。これは、このタイプのタスクを処理するための正しいアプローチですか?
また、私は興味があります — 誰かが--worker-retry-external-task
コマンドの経験がありますか? ちょっと理解に苦しむ。
ソースコードでは、
def _is_external(task):
return task.run is None or task.run == NotImplemented
LuigiTask にメソッドがあるかどうかを判断するために呼び出されますが、run()
メソッドはありWrapperTask
ません。したがって、--retry-external-task
フラグがcomplete()
完了するまでこれを再試行し、アクションを実行することを期待しています。ただし、インタプリタをいじってみると、次のように思われます。
>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
<bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True
このコード スニペットは、想定どおりの動作をしていません。
私はここで基地外ですか?