11

私はPythonにかなり慣れていません。標準入力でテキスト行を読み取り、何らかの方法で変換してデータベースに書き込むために multiprocessing モジュールを使用しています。これが私のコードのスニペットです:

batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        pool.apply_async(insert, args=(batch,i+1))
        batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()

Pythonプログラムにパイプする巨大な入力ファイル(数億行)を処理するまで、すべて正常に動作します。ある時点で、データベースが遅くなると、メモリがいっぱいになります。

いくつか遊んだ後、pool.apply_async と pool.map_async が決してブロックされないことが判明したため、処理される呼び出しのキューがますます大きくなります。

私の問題に対する正しいアプローチは何ですか? 特定のキューの長さに達するとすぐに、pool.apply_async 呼び出しをブロックする、設定できるパラメーターが必要です。AFAIR Java では、その目的のために ThreadPoolExecutor に固定長の BlockingQueue を与えることができます。

ありがとう!

4

4 に答える 4

13

および関数はapply_asyncmap_asyncメイン プロセスをブロックしないように設計されています。そうするために、は残念ながらサイズを変更できないPool内部を維持します。Queue

問題を解決する方法はSemaphore、キューを必要なサイズで初期化することです。プールにフィードする前、およびワーカーがタスクを完了した後に、セマフォを取得して解放します。

Python 2.6 以降で動作する例を次に示します。

from threading import Semaphore
from multiprocessing import Pool

def task_wrapper(f):
    """Python2 does not allow a callback for method raising exceptions,
    this wrapper ensures the code run into the worker will be exception free.

    """
    try:
        return f()
    except:
        return None

class TaskManager(object):
    def __init__(self, processes, queue_size):
        self.pool = Pool(processes=processes)
        self.workers = Semaphore(processes + queue_size)

    def new_task(self, f):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done))

    def task_done(self):
        """Called once task is done, releases the queue is blocked."""
        self.workers.release()

プールの実装を使用した別の例。concurrent.futures

于 2013-09-09T13:47:37.120 に答える
11

誰かがここにたどり着いた場合に備えて、これが私が問題を解決した方法です:私はmultiprocessing.Poolの使用をやめました。これが私が今それをする方法です:

#set amount of concurrent processes that insert db data
processes = multiprocessing.cpu_count() * 2

#setup batch queue
queue = multiprocessing.Queue(processes * 2)

#start processes
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches    
batch=[]
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        queue.put((batch,i+1))
        batch = []
if batch:
    queue.put((batch,i+1))

#stop processes using poison-pill
for _ in range(processes): queue.put((None,None))

print "all done."

挿入メソッドでは、各バッチの処理は、ポイズンピルを受け取るまでキューからプルするループにラップされます。

while True:
    batch, end = queue.get()
    if not batch and not end: return #poison pill! complete!
    [process the batch]
print 'worker done.'
于 2012-03-08T15:11:07.580 に答える
2

apply_asyncオブジェクトを返します。これは次のAsyncResult場合に実行できますwait

if len(batch) >= 10000:
    r = pool.apply_async(insert, args=(batch, i+1))
    r.wait()
    batch = []

これをよりクリーンな方法で実行したい場合は、10000のmultiprocessing.Queueaを使用し、そのようなキューからフェッチするクラスを派生させる必要があります。maxsizeWorkermultiprocessing.Process

于 2012-03-07T13:07:12.087 に答える