2

だから私はテキストドキュメントの各行をマルチプロセッシングして読み込もうとしています。660918行あり、すべて同じ長さであることがわかっています。ただし、次のコードでは行の長さが変わっているようで、理由がわかりません。

import multiprocessing

class Worker(multiprocessing.Process):
    def __init__(self,in_q):
        multiprocessing.Process.__init__(self)
        self.in_q = in_q
    def run(self):      
        while True:
            try:
                in_q.get()
                temp_line = short_file.readline()
                temp_line = temp_line.strip().split()
                print len(temp_line)
                self.in_q.task_done()
            except:                              
                break     

if __name__ == "__main__":
    num_proc = 10
    lines = 100000 #660918 is how many lines there actually are
    in_q = multiprocessing.JoinableQueue()
    File = 'HGDP_FinalReport_Forward.txt'
    short_file = open(File)

    for i in range(lines):
        in_q.put(i)    

    for i in range(num_proc):
        worker = Worker(in_q)
        worker.start()
    in_q.join() 
4

2 に答える 2

7

メインプロセスでファイルを開き、子プロセスでそのファイルから読み取ります。あなたはそれをすることはできません。

裏で、ファイルオブジェクトは事実上生のファイルハンドルとメモリバッファです。各プロセスはファイルハンドルを共有しますが、各プロセスには独自のメモリバッファがあります。

すべての行がそれぞれ50バイトで、メモリバッファが4096バイトであるとしましょう。

プロセス1はreadlineを呼び出し、ファイルからバイト0〜4095をそのバッファーに読み取り、そのバッファーをスキャンして50バイトの改行を探し、最初の50バイトを返します。ここまでは順調ですね。

プロセス2はreadlineを呼び出します。これは、ファイルからバイト4096-8191をそのバッファーに読み取り、そのバッファーをスキャンして改行を探します。最初のものは4100にあり、これは5バイトであるため、最初の5バイトを返します。

等々。

理論的には、バッファなしのI / Oを実行することでこれを回避できますが、実際には、なぜですか?メインプロセスの行を読んでみませんか?この問題を回避するだけでなく、並列処理も改善される可能性があります。I/ Oは本質的にシーケンシャルであるため、これらのプロセスはすべてI / Oでブロックされた時間のほとんどを費やします。つまり、何の役にも立ちません。

ちなみに、実行中のループの先頭近くでは、self.in_q.get()の代わりにin_q.get()を実行しています。(in_qは消えることのないグローバル変数であり、self.in_qはそのコピーにすぎないため、これは機能しますが、これに依存する必要はありません。)

于 2012-06-15T00:28:11.033 に答える
1

そこで、プールを使用するように変更しましたが、動作しているようです。次の方がいいですか?

import multiprocessing as mp

File = 'HGDP_FinalReport_Forward.txt'
#short_file = open(File)
test = []

def pro(temp_line):
    temp_line = temp_line.strip().split()
    return len(temp_line)

if __name__ == "__main__":
    with open("HGDP_FinalReport_Forward.txt") as lines:
        pool = mp.Pool(processes = 10)
        t = pool.map(pro,lines.readlines())
    print t
于 2012-06-15T16:31:18.323 に答える