2

から返された結果をスキップしたいと思いmap_asyncます。それらは記憶の中で成長していますが、私はそれらを必要としません。

ここにいくつかのコードがあります:

def processLine(line):
    #process something
    print "result"
pool = Pool(processes = 8)
for line in sys.stdin:
    lines.append(line)
    if len(lines) >= 100000:
        pool.map_async(processLine, lines, 2000)
pool.close()
pool.join()

数億行のファイルを処理する必要がある場合、python プロセスはメモリ内で数ギガバイトまで増加します。どうすれば解決できますか?

ご協力いただきありがとうございます :)

4

2 に答える 2

3

コードにバグがあります:

for line in sys.stdin:
    lines.append(line)
    if len(lines) >= 100000:
        pool.map_async(processLine, lines, 2000)

linesこれは、100000 行を超えるまで待機します。その後、追加の行ごとpool.map_asyncに 100000 行以上のリスト全体で呼び出されます。

実際に何をしようとしているのかは明確ではありませんが、戻り値が必要ない場合は、pool.apply_asyncではなくを使用してくださいpool.map_async。多分このようなもの:

import multiprocessing as mp

def processLine(line):
    #process something
    print "result"

if __name__ == '__main__':
    pool = mp.Pool(processes = 8)
    for line in sys.stdin:
        pool.apply_async(processLine, args = (line, ))
    pool.close()
    pool.join()
于 2012-12-17T15:17:50.710 に答える
0

はい、あなたが正しい。いくつかのバグがあります

つまり:

def processLine(line):
  #process something
  print "result"
  pool = Pool(processes = 8)

if __name__ == '__main__':
  for line in sys.stdin:
    lines.append(line)
    if len(lines) >= 100000:
      pool.map_async(processLine, lines, 2000)
      lines = [] #to clear buffer
  pool.map_async(processLine, lines, 2000)
  pool.close()
  pool.join()

構成可能なchunk_sizeがあるため、map_asyncを使用したため、処理時間が非常に短い行がたくさんある場合により効率的です。

于 2012-12-17T22:50:45.543 に答える