ではluigi.Task.run
、オブジェクトをファイル/データベース/などにシリアライズする必要があります。
MyTask(luigi.Task):
param = luigi.Parameter()
def requires(self):
AnotherTask(self.param)
def output(self):
luigi.LocalTarget('out_{}'.format(self.param))
def run(self):
with self.input().open('r') as infile:
# instantiate incoming data
indata = pd.read_csv(infile, index=False, parse_date=...)
# my process
with self.output().open('w') as outfile:
# serialize outgoing data
outdata.to_csv(outfile, index=False, ...)
ただし、便宜上、pd.read_csv(...)
タスクを再利用するときに同じインスタンス化手順を記述する必要があるため、スニペットをスキップしたいと思います。
このようにルイージで自動的にインスタンス化する方法はありますか?:
AnotherTask(luigi.Task):
param = luigi.Parameter()
def requires(self):
...
def output(self):
...
def _instantiate(self):
with self.output().open('r') as outfile:
outdata = pd.read_csv(outfile, index=False, parse_date=...)
return outdata
MyTask(luigi.Task):
param = luigi.Parameter()
def requires(self):
AnotherTask(self.param)
def output(self):
luigi.LocalTarget('out_{}'.format(self.param))
def run(self):
# automatic instantiation via AnotherTask._instantiate()
indata = self.input()
# my process
outdata = indata.someprocess()
with self.output().open('w') as outfile:
# serialize outgoing data
outdata.to_csv(outfile, index=False, ...)