私は Ruffus の開発者です。あなたがやろうとしていることを完全に理解しているかどうかはわかりませんが、ここに行きます:
パイプラインの次のステージを実行するために異なる時間がかかるジョブを待機することは、まさに Ruffus が目指していることなので、うまくいけば簡単です。
最初の質問は、事前に、つまりパイプラインが実行される前に、どのファイルが作成されているか知っていますか? あなたがそうしていると仮定することから始めましょう。
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
呼び出されるたびにファイルを作成するダミー関数を書きましょう。Ruffus では、すべての入力ファイル名と出力ファイル名が最初の 2 つのパラメーターにそれぞれ含まれます。入力ファイル名がないため、関数呼び出しは次のようになります。
create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")
create_file の定義は次のようになります。
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")
これらの各ファイルは、create_file への 3 回の個別の呼び出しで作成されます。必要に応じて、これらを並行して実行できます。
pipeline_run([create_file], multiprocess = 5)
次に、ファイルを結合します。「@Merge」デコレータは、まさにこのために設定されています。それを前の関数にリンクするだけです:
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
これは、create_file() への 3 回の呼び出しですべてのファイルの準備が整ったときにのみ、merge_file を呼び出します。
コード全体は次のとおりです。
from ruffus import *
filenames = ["one.file", "two.file", "three.file"]
from random import randint
from time import sleep
@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")
@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())
pipeline_run([merge_file], multiprocess = 5)
そして、これは結果です:
>>> pipeline_run([merge_file], multiprocess = 5)
Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file