デフォルトで1つのマルチプロセッシングプロセスを作成するスクリプトを作成しました。その後、正常に動作します。複数のプロセスを開始すると、ハングし始めますが、常に同じ場所にあるとは限りません。プログラムのコードは約700行なので、何が起こっているのかを要約してみます。DNA配列を整列させるという最も遅いタスクを並列化することにより、マルチコアを最大限に活用したいと思います。そのために、サブプロセスモジュールを使用してコマンドラインプログラム'hmmsearch'を呼び出します。これは、/ dev / stdinを介してシーケンスをフィードし、次に/ dev/stdoutを介して整列されたシーケンスを読み取ります。stdout / stdinから読み取り/書き込みを行うこれらの複数のサブプロセスインスタンスが原因でハングが発生することを想像しますが、これを実行する最善の方法が本当にわかりません... os.fdopen(...)&osを調べていました.tmpfile()、データをフラッシュできる一時的なファイルハンドルまたはパイプを作成します。ただし、これまで使用したことがなく、サブプロセスモジュールでそれを行う方法を想像することはできません。パイプは高スループットのデータ処理ではるかに優れているため、理想的にはハードドライブの使用を完全にバイパスしたいと思います。これでどんな助けでもとても素晴らしいでしょう!
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq( multiprocessing.Process ):
def __init__( self, inPipe, outPipe, semaphore, options ):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen( ['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE )
self.sem.acquire()
align_process.stdin.write( seq_record.format('fasta') )
align_process.stdin.close()
for seq in SeqIO.parse( align_process.stdout, 'stockholm' ): # get the alignment output
self.out_pipe.send_bytes( seq.seq.tostring() ) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
これをデバッグするのにしばらく時間を費やした後、私は常にそこにあり、まだ完全には解決されていない問題を見つけましたが、(デバッグの)プロセスにおける他のいくつかの非効率性を修正しました。2つの初期フィーダー関数があります。このalign_seqクラスと、位置固有のスコア行列(PSM)をディクショナリにロードするファイルパーサーparseHMM()です。次に、メインの親プロセスは、各残基に関連するスコアへのポインターとして(辞書の)辞書を使用して、アラインメントをPSMと比較します。必要なスコアを計算するために、2つの別々のmultiprocessing.Processクラスがあります。1つのクラスlogScore()は、対数オッズ比を計算します(math.exp()を使用)。これを並列化します。計算されたスコアを最後のプロセスsumScore()にキューイングしますこれは、これらのスコアを(math.fsumで)合計し、合計とすべての位置固有のスコアを辞書として親プロセスに返します。ie Queue.put([sum、{残基位置:位置固有のスコア、...}])これは頭を動かすのに非常に混乱しているので(キューが多すぎます!)、読者がなんとかフォローしていることを願っています。 。上記のすべての計算が完了したら、累積スコアをタブ区切りの出力として保存するオプションを提供します。これは、スコアがあるはずのすべての位置のスコアを確実に出力するため、現在(昨夜から)時々壊れている場所です。レイテンシー(コンピューターのタイミングが同期していない)のために、logScoreの最初にキューに入れられたものがsumScoreに到達しないことがあると思います最初。sumScoreがいつタリーを返し、再開するかを知るために、計算を実行した最後のlogScoreプロセスのキューに「endSEQ」を入れました。それなら最後にsumScoreに到達するはずだと思いましたが、常にそうであるとは限りません。たまにしか壊れません。そのため、デッドロックは発生しなくなりましたが、結果を印刷または保存するときにKeyErrorが発生します。KeyErrorが発生することがある理由は、logScoreプロセスごとにキューを作成するためだと思いますが、代わりに、すべて同じキューを使用する必要があります。今、私は次のようなものを持っています:-
class logScore( multiprocessing.Process ):
def __init__( self, inQ, outQ ):
self.inQ = inQ
...
def scoreSequence( processes, HMMPSM, sequenceInPipe ):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put( residue_score )
## End of sequence
processes[process_index].inQ.put( 'endSEQ' )
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore( multiprocessing.Queue() , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )
一方、すべてのlogScoreインスタンス間で共有するキューを1つだけ作成する必要があります。すなわち
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore( scoreSeq_to_logScore , logScore_to_sumScoreQ ) for i in xrange( options['-ncpus'] ) ]
sumScoreProcess = sumScore( logScore_to_sumScoreQ, scoresOut )