5

手動で作成されたファイルが存在するかどうかを確認する Luigi パイプラインに取り組んでおり、存在する場合は次のタスクに進みます。

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
    task_namespace='MyTask'
    path = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

 class ProcessExternalFile(luigi.Task):
      task_namespace='MyTask'
      path = luigi.Parameter()

      def requires(self):
          return ExternalFileChecker(path=self.path)

      def output(self):
          dirname = self.path
          outfile = os.path.join(dirname, 'processedfile.txt')
          return luigi.LocalTarget(outfile)

      def run(self):
          #do processing

if __name__ == '__main__':
      path = r'D:\MyPath\luigi'
      luigi.run(['MyTask.ProcessExternalFile','--path', path,\
      '--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
      '--worker-keep-alive'])

私が望むのは、手動ファイルを作成してパスに貼り付けた後、ルイージが続行することです。これを行うと、ファイルを見つけてタスクを続行する代わりに、数秒ごとに新しいタスクを再チェックします。

DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)

かなりの時間 (15 ~ 20 分程度) が経過すると、luigi はファイルを見つけ、必要に応じて続行できるようになります。この遅延を防ぐにはどうすればよいですか? ファイルが存在し次第、ルイージに続行してもらいたい。

4

1 に答える 1