2

Luigi がメソッドを実行する順序は何ですか (run、output、require)。タスク DAG の有効性をチェックするための最初のチェックとして require が実行されることは理解していますが、run() の後に出力を実行するべきではありませんか?

私は実際に実行中のkafkaメッセージを待ち、それに基づいて他のタスクの束をトリガーし、LocalTargetを返そうとしています。このような:

def run(self):
    for message in self.consumer:
        self.metadata_key = str(message.value, 'utf-8')
        self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
        if not os.path.exists(self.path):
            os.mkdir(self.path)

        with self.conn.cursor() as cursor:
              all_accounts = cursor.execute('select domainname from tblaccountinfo;')
        for each in all_accounts:
            open(os.path.join(self.path,each)).close()

def output(self):
    return LocalTarget(self.path)

ただし、次のエラーが表示されます。

例外: path または is_tmp を設定する必要があります

return LocalTarget(self.path)行。def run() が完了するまで luigi が def output() メソッドを実行しようとするのはなぜですか?

4

2 に答える 2