1

私は ruffus を使ってパイプラインを書いています。並行して何度も呼び出される関数があり、複数のファイルが作成されます。これらすべてのファイルが作成された後に呼び出される関数「combineFiles()」を作成したいと思います。これらはクラスター上で並行して実行されるため、すべてが同時に終了するわけではありません。作成する必要がある一連のファイル名を返す関数 'getFilenames()' を作成しましたが、結合ファイルが存在するまで待機させるにはどうすればよいですか?

私は次のことを試しました:

@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
  # I should only be called if every file in the list 'filenames' exists

デコレータも試しました:

@merge(getFilenames)

しかし、これも機能しません。getFilenames によって指定されたファイルが作成される前に、combineFiles は依然として誤って呼び出されます。それらのファイルがそこにあることを条件として、結合ファイルを作成するにはどうすればよいですか?

ありがとう。

4

1 に答える 1

2

私は Ruffus の開発者です。あなたがやろうとしていることを完全に理解しているかどうかはわかりませんが、ここに行きます:

パイプラインの次のステージを実行するために異なる時間がかかるジョブを待機することは、まさに Ruffus が目指していることなので、うまくいけば簡単です。

最初の質問は、事前に、つまりパイプラインが実行される前に、どのファイルが作成されているか知っていますか? あなたがそうしていると仮定することから始めましょう。

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

呼び出されるたびにファイルを作成するダミー関数を書きましょう。Ruffus では、すべての入力ファイル名と出力ファイル名が最初の 2 つのパラメーターにそれぞれ含まれます。入力ファイル名がないため、関数呼び出しは次のようになります。

create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")

create_file の定義は次のようになります。

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    open(output_file_name, "w").write("dummy file")

これらの各ファイルは、create_file への 3 回の個別の呼び出しで作成されます。必要に応じて、これらを並行して実行できます。

pipeline_run([create_file], multiprocess = 5)

次に、ファイルを結合します。「@Merge」デコレータは、まさにこのために設定されています。それを前の関数にリンクするだけです:

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())

これは、create_file() への 3 回の呼び出しですべてのファイルの準備が整ったときにのみ、merge_file を呼び出します。

コード全体は次のとおりです。

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    # simulate create file process of indeterminate complexity
    sleep(randint(1,5))
    open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

そして、これは結果です:

>>> pipeline_run([merge_file], multiprocess = 5)

    Job = [None -> two.file] completed
    Job = [None -> three.file] completed
    Job = [None -> one.file] completed
Completed Task = create_file
    Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file
于 2010-03-26T13:03:06.180 に答える