0

デフォルトで1つのマルチプロセッシングプロセスを作成するスクリプトを作成しました。その後、正常に動作します。複数のプロセスを開始すると、ハングし始めますが、常に同じ場所にあるとは限りません。プログラムのコードは約700行なので、何が起こっているのかを要約してみます。DNA配列を整列させるという最も遅いタスクを並列化することにより、マルチコアを最大限に活用したいと思います。そのために、サブプロセスモジュールを使用してコマンドラインプログラム'hmmsearch'を呼び出します。これは、/ dev / stdinを介してシーケンスをフィードし、次に/ dev/stdoutを介して整列されたシーケンスを読み取ります。stdout / stdinから読み取り/書き込みを行うこれらの複数のサブプロセスインスタンスが原因でハングが発生することを想像しますが、これを実行する最善の方法が本当にわかりません... os.fdopen(...)&osを調べていました.tmpfile()、データをフラッシュできる一時的なファイルハンドルまたはパイプを作成します。ただし、これまで使用したことがなく、サブプロセスモジュールでそれを行う方法を想像することはできません。パイプは高スループットのデータ処理ではるかに優れているため、理想的にはハードドライブの使用を完全にバイパスしたいと思います。これでどんな助けでもとても素晴らしいでしょう!

import multiprocessing, subprocess
from Bio import SeqIO

class align_seq( multiprocessing.Process ):
    def __init__( self, inPipe, outPipe, semaphore, options ):
        multiprocessing.Process.__init__(self)
        self.in_pipe = inPipe          ## Sequences in
        self.out_pipe = outPipe        ## Alignment out
        self.options = options.copy()  ## Modifiable sub-environment
        self.sem = semaphore

    def run(self):
        inp = self.in_pipe.recv()
        while inp != 'STOP':
            seq_record , HMM = inp  # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
                                    # HMM is a file location.
            align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
            self.sem.acquire()
            align_process.stdin.write( seq_record.format('fasta') )
            align_process.stdin.close()
            for seq in SeqIO.parse( align_process.stdout, 'stockholm' ):  # get the alignment output
                self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
            align_process.wait()   # Don't know if there's any need for this??
            self.sem.release()
            align_process.stdout.close()
            inp = self.in_pipe.recv()  
        self.in_pipe.close()    #Close handles so don't overshoot max. limit on number of file-handles.
        self.out_pipe.close()   

これをデバッグするのにしばらく時間を費やした後、私は常にそこにあり、まだ完全には解決されていない問題を見つけましたが、(デバッグの)プロセスにおける他のいくつかの非効率性を修正しました。2つの初期フィーダー関数があります。このalign_seqクラスと、位置固有のスコア行列(PSM)をディクショナリにロードするファイルパーサーparseHMM()です。次に、メインの親プロセスは、各残基に関連するスコアへのポインターとして(辞書の)辞書を使用して、アラインメントをPSMと比較します。必要なスコアを計算するために、2つの別々のmultiprocessing.Processクラスがあります。1つのクラスlogScore()は、対数オッズ比を計算します(math.exp()を使用)。これを並列化します。計算されたスコアを最後のプロセスsumScore()にキューイングしますこれは、これらのスコアを(math.fsumで)合計し、合計とすべての位置固有のスコアを辞書として親プロセスに返します。ie Queue.put([sum、{残基位置:位置固有のスコア、...}])これは頭を動かすのに非常に混乱しているので(キューが多すぎます!)、読者がなんとかフォローしていることを願っています。 。上記のすべての計算が完了したら、累積スコアをタブ区切りの出力として保存するオプションを提供します。これは、スコアがあるはずのすべての位置のスコアを確実に出力するため、現在(昨夜から)時々壊れている場所です。レイテンシー(コンピューターのタイミングが同期していない)のために、logScoreの最初にキューに入れられたものがsumScoreに到達しないことがあると思います最初。sumScoreがいつタリーを返し、再開するかを知るために、計算を実行した最後のlogScoreプロセスのキューに「endSEQ」を入れました。それなら最後にsumScoreに到達するはずだと思いましたが、常にそうであるとは限りません。たまにしか壊れません。そのため、デッドロックは発生しなくなりましたが、結果を印刷または保存するときにKeyErrorが発生します。KeyErrorが発生することがある理由は、logScoreプロセスごとにキューを作成するためだと思いますが、代わりに、すべて同じキューを使用する必要があります。今、私は次のようなものを持っています:-

class logScore( multiprocessing.Process ):
    def __init__( self, inQ, outQ ):
        self.inQ = inQ
        ...

def scoreSequence( processes, HMMPSM, sequenceInPipe ):
    process_index = -1
    sequence = sequenceInPipe.recv_bytes()
    for residue in sequence:
        .... ## Get the residue score.
        process_index += 1
        processes[process_index].inQ.put( residue_score )
    ## End of sequence
    processes[process_index].inQ.put( 'endSEQ' )


logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )

一方、すべてのlogScoreインスタンス間で共有するキューを1つだけ作成する必要があります。すなわち

logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
4

3 に答える 3

2

これはパイプラインの仕組みではありません...しかし、気を楽にするために、サブプロセスのドキュメントからの抜粋を次に示します。

stdin、stdout、およびstderrは、それぞれ、実行されたプログラムの標準入力、標準出力、および標準エラーファイルハンドルを指定します。有効な値は、PIPE、既存のファイル記述子(正の整数)、既存のファイルオブジェクト、およびNoneです。PIPEは、子への新しいパイプを作成する必要があることを示します。Noneを使用すると、リダイレクトは発生しません。子のファイルハンドルは親から継承されます。

障害が発生する可能性が最も高い領域は、メインプロセスとの通信、またはセマフォの管理にあります。バグが原因で、状態遷移/同期が期待どおりに進行していない可能性がありますか?各ブロッキング呼び出しの前後にlogging/printステートメントを追加してデバッグすることをお勧めします。ここでは、メインプロセスと通信し、セマフォを取得/解放して、問題が発生した場所を絞り込みます。

また、私は興味があります-セマフォは絶対に必要ですか?

于 2011-01-06T21:45:19.367 に答える
1

また、単純なタスクを並列化したかったので、そのために小さなPythonスクリプトを作成しました。あなたは見ることができます:http: //bioinf.comav.upv.es/psubprocess/index.html

必要なものよりも少し一般的ですが、単純なタスクの場合は非常に使いやすいです。それはあなたにとって少なくともいくらかの誹謗中傷かもしれません。

ホセブランカ

于 2011-01-07T11:32:50.073 に答える
0

サブプロセスでデッドロックが発生している可能性があります。待機するのではなく、通信方式を使用してみましたか? http://docs.python.org/library/subprocess.html

于 2011-01-07T14:28:36.673 に答える