18

リストからファイルを開き、そのファイル内のテキストに対して何かを行うスクリプトがあります。この操作を並列化するために、python multiprocessing と Pool を使用しています。スクリプトの抽象化は次のとおりです。

import os
from multiprocessing import Pool

results = []
def testFunc(files):
    for file in files:
        print "Working in Process #%d" % (os.getpid())
        #This is just an illustration of some logic. This is not what I'm actually doing.
        for line in file:
            if 'dog' in line:
                results.append(line)

if __name__=="__main__":
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.apply_async(testFunc, args = (files,))
    results2 = results.get()

これを実行すると、プロセス ID の出力は反復ごとに同じになります。基本的に私がやろうとしているのは、入力リストの各要素を取り出して別々のプロセスにフォークすることですが、1 つのプロセスがすべての作業を行っているようです。

4

2 に答える 2

31
  • apply_asyncプールに1つのタスクを実行します。apply_asyncより多くのプロセッサを実行するには、何度も呼び出す必要があり ます。
  • 両方のプロセスが同じリストに書き込もうとしないようにしてください results。プールワーカーは別々のプロセスであるため、2つは同じリストに書き込みません。これを回避する1つの方法は、出力キューを使用することです。自分で設定することも、apply_asyncのコールバックを使用してキューを設定することもできます。apply_async関数が完了すると、コールバックを呼び出します。
  • map_asyncの代わりに使用することもできますapply_asyncが、リストのリストが表示され、フラット化する必要があります。

したがって、代わりに次のようなものを試してください。

import os
import multiprocessing as mp

results = []   

def testFunc(file):
    result = []
    print "Working in Process #%d" % (os.getpid())
    # This is just an illustration of some logic. This is not what I'm
    # actually doing.
    with open(file, 'r') as f:
        for line in f:
            if 'dog' in line:
                result.append(line)
    return result


def collect_results(result):
    results.extend(result)

if __name__ == "__main__":
    p = mp.Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    for f in files:
        p.apply_async(testFunc, args=(f, ), callback=collect_results)
    p.close()
    p.join()
    print(results)
于 2012-09-18T19:55:20.313 に答える
8

たぶんこの場合、あなたは使うべきですmap_async

import os
from multiprocessing import Pool

results = []
def testFunc(file):
    message =  ("Working in Process #%d" % (os.getpid()))
    #This is just an illustration of some logic. This is not what I'm actually doing.
    for line in file:
        if 'dog' in line:
            results.append(line)
    return message

if __name__=="__main__":
    print("saddsf")
    p = Pool(processes=2)
    files = ['/path/to/file1.txt', '/path/to/file2.txt']
    results = p.map_async(testFunc, files)
    print(results.get())
于 2012-09-18T19:56:10.077 に答える