Luigi がメソッドを実行する順序は何ですか (run、output、require)。タスク DAG の有効性をチェックするための最初のチェックとして require が実行されることは理解していますが、run() の後に出力を実行するべきではありませんか?
私は実際に実行中のkafkaメッセージを待ち、それに基づいて他のタスクの束をトリガーし、LocalTargetを返そうとしています。このような:
def run(self):
for message in self.consumer:
self.metadata_key = str(message.value, 'utf-8')
self.path = os.path.join(settings.LUIGI_OUTPUT_PATH, self.metadata_key, self.batch_id)
if not os.path.exists(self.path):
os.mkdir(self.path)
with self.conn.cursor() as cursor:
all_accounts = cursor.execute('select domainname from tblaccountinfo;')
for each in all_accounts:
open(os.path.join(self.path,each)).close()
def output(self):
return LocalTarget(self.path)
ただし、次のエラーが表示されます。
例外: path または is_tmp を設定する必要があります
return LocalTarget(self.path)行。def run() が完了するまで luigi が def output() メソッドを実行しようとするのはなぜですか?