2

多くのプロジェクトで、パイプライン ツールとしてluigiを使用しています。これを利用してパラメータ検索を実装することを考えました。標準luigi.file.LocalTargetには、パラメーターを処理するための非常に単純なアプローチがあり、ドキュメントの例にも示されています。

def output(self):
    return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)

つまり、パラメータはファイル名で保存されます。これにより、特定のパラメーターの組み合わせが既に計算されているかどうかを簡単に確認できます。これは、タスクのパラメーターがより複雑になるとすぐに厄介になります。

パラメータ検索の非常に単純なアイデアを次に示します。

import luigi
class Sum(luigi.Task):

    long_ = luigi.Parameter()
    list_ = luigi.Parameter()
    of = luigi.Parameter()
    parameters = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget("task{}_{}_{}_{}.txt".format(self.long_,
                                                              self.list_,
                                                              self.of,
                                                              self.parameters))

    def run(self):

        sum_ = self.long_ + self.list_ + self.of + self.parameters
        with self.output().open('w') as out_file:
            out_file.write(str(sum_))


class ParameterSearch(luigi.Task):

    def requires(self):

        list_of_parameter_combinations = [
            {
                "long_" : 1,
                "list_" : 2,
                "of" : 3,
                "parameters" : 4

            },{
                "long_" : 5,
                "list_" : 6,
                "of" : 7,
                "parameters" : 8
            }
        ]

        for pc in list_of_parameter_combinations:
            yield Sum(**pc)

確かに、この例では、4 つのパラメーターすべてをファイル名にエンコードできますが、このアプローチが境界に到達できるという多くの空想は必要ありません。配列のようなパラメーターの例を考えてみてください。

私のフォローアップのアイデアは、パラメータと結果をある種のエンベロープオブジェクトに保存し、それをターゲットとして保存することでした。ファイル名は、最初のあいまい検索のパラメーターのある種のハッシュである可能性があります。

封筒クラスあり

class Envelope(object):

    @classmethod
    def hashify(cls, params):
        return hash(frozenset(params.items()))

    def __init__(self, result, **params):

        self.params = {}
        for k in params:
            self.params[k] = params.get(k)

    def hash(self):
        return Envelope.hashify(self.params)

次に、LocalTarget を強化し、エンベロープ内のすべてのパラメーターが一致しているかどうかを確認できる新しいターゲットがあります。

class EnvelopedTarget(luigi.file.LocalTarget):

    fs = luigi.file.LocalFileSystem()

    def __init__(self, params, path=None, format=None, is_tmp=False):
        self.path = path
        self.params = params

        if format is None:
            format = luigi.file.get_default_format()

        if not path:
            if not is_tmp:
                raise Exception('path or is_tmp must be set')
            path = os.path.join(tempfile.gettempdir(), 'luigi-tmp-%09d' % random.randint(0, 999999999))
        super(EnvelopedTarget, self).__init__(path)
        self.format = format
        self.is_tmp = is_tmp

    def exists(self):
        path = self.path
        if '*' in path or '?' in path or '[' in path or '{' in path:
            logger.warning("Using wildcards in path %s might lead to processing of an incomplete dataset; "
                           "override exists() to suppress the warning.", path)
        if self.fs.exists(path):
            with self.open() as fin:
                envelope = pickle.load(fin)

                try:
                    assert len(envelope.params) == len(self.params)
                    for param,paramval in self.params.items():
                        assert paramval == envelope.params.get(param)

                except(AssertionError):
                    return False
            return True
        else:
            return False

ここでの問題は、このターゲットを使用すると、もともと luigi が最小化することを目指していたボイラープレートが追加されることです。新しい基本タスクを設定しました

class BaseTask(luigi.Task):

    def output(self, envelope):
        path = '{}{}.txt'.format(type(self).__name__, envelope.hash())
        params = envelope.params
        return EnvelopedTarget(params, path=path)

    def complete(self):

        envelope = Envelope(None, **self.param_kwargs)

        outputs = flatten(self.output(envelope))
        if len(outputs) == 0:
            warnings.warn(
                "Task %r without outputs has no custom complete() method" % self,
                stacklevel=2
            )
            return False

        return all(map(lambda output: output.exists(), outputs))

    def run(self):

        result, outparams = self.my_run()

        envelope = Envelope(result, **outparams)

        with self.output(envelope).open('w') as fout:
            pickle.dump(envelope, fout)

結果のEnvelopedSumTask は非常に小さくなります。

class EnvelopedSum(BaseTask):

    long_ = luigi.Parameter()
    list_ = luigi.Parameter()
    of = luigi.Parameter()
    parameters = luigi.Parameter()

    def my_run(self):
        return sum(self.param_kwargs.values()), self.param_kwargs

Sumこのタスクは、最初のタスクと同じ方法で実行できます。

注: luigi-task-results をエンベロープする方法のこの実装例は、安定とはほど遠いものであり、結果とパラメーターをエンベロープすることによって私が意味するものをより具体的に示しています。

私の質問は: ルイージで多くの複雑なパラメーターを処理する簡単な方法はありませんか?

フォローアップの質問: パラメータ検索が実行されたコード バージョン (および/またはサブタスクのパッケージ バージョン) の記録を保持することを考えた人はいますか?

このトピックのどこを読むべきかについてのコメントも大歓迎です。

ノート:

これを実行するには、おそらくいくつかのインポートが必要です。

from luigi.task import flatten
import warnings
import pickle
4

1 に答える 1