0

Luigi を使用していくつかのタスクを実行しています。次に、出力を標準化されたファイルの場所に一括転送する必要があります。complete()これを行うために、オーバーライドされたメソッドを使用して WrapperTask を作成しました。

from luigi.task import flatten

class TaskX(luigi.WrapperTask):
    date = luigi.DateParameter()
    client = luigi.s3.S3Client()

    def requires(self):
        yield TaskA(date=self.date)     
        yield TaskB(date=self.date)

    def complete(self):
        tasks_complete = all(r.complete() for r in flatten(self.requires())) 

        ## at the end of everything, batch copy the files
        if tasks_complete:
            self.client.copy('current-old', 'current')
            return True
        else:
            return False


if __name__ == "__main__":
    luigi.run()

complete()しかし、プロセスが実際に終了したときに呼び出される条件部分を取得するのに問題があります。

これは、他の人が指摘した非同期動作のためだと思いますが、修正方法がわかりません。

これらのコマンドライン パラメーターを使用して Luigi を実行してみました。

$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task

しかし、それは正しく機能していないようです。これは、このタイプのタスクを処理するための正しいアプローチですか?

また、私は興味があります — 誰かが--worker-retry-external-taskコマンドの経験がありますか? ちょっと理解に苦しむ。

ソースコードでは、

def _is_external(task):
    return task.run is None or task.run == NotImplemented

LuigiTask にメソッドがあるかどうかを判断するために呼び出されますが、run()メソッドはありWrapperTaskません。したがって、--retry-external-taskフラグがcomplete()完了するまでこれを再試行し、アクションを実行することを期待しています。ただし、インタプリタをいじってみると、次のように思われます。

>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
    <bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True

このコード スニペットは、想定どおりの動作をしていません。

私はここで基地外ですか?

4

1 に答える 1