私はPythonにかなり慣れていません。標準入力でテキスト行を読み取り、何らかの方法で変換してデータベースに書き込むために multiprocessing モジュールを使用しています。これが私のコードのスニペットです:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
Pythonプログラムにパイプする巨大な入力ファイル(数億行)を処理するまで、すべて正常に動作します。ある時点で、データベースが遅くなると、メモリがいっぱいになります。
いくつか遊んだ後、pool.apply_async と pool.map_async が決してブロックされないことが判明したため、処理される呼び出しのキューがますます大きくなります。
私の問題に対する正しいアプローチは何ですか? 特定のキューの長さに達するとすぐに、pool.apply_async 呼び出しをブロックする、設定できるパラメーターが必要です。AFAIR Java では、その目的のために ThreadPoolExecutor に固定長の BlockingQueue を与えることができます。
ありがとう!