9

Pool.apply_asyncを使用して(大きなパラメーターを使用して)多数のタスクを実行すると、プロセスが割り当てられて待機状態になり、待機中のプロセスの数に制限はありません。これは、以下の例のように、すべてのメモリを消費することで終わる可能性があります。

import multiprocessing
import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test():

    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
    p.close()
    p.join()

if __name__ == '__main__':
    test()

待機中のプロセスの数が制限され、待機中のキューがいっぱいになるとPool.apply_asyncがブロックされるように、待機中のキューを制限する方法を探しています。

4

4 に答える 4

7

multiprocessing.Poolオプションのパラメーターを受け取るタイプの_taskqueueメンバーがあります。残念ながら、パラメータを設定せずに構築します。multiprocessing.Queuemaxsizemaxsize

コンストラクターに渡されるmultiprocessing.Poolコピーアンドペーストでサブクラス化することをお勧めします。multiprocessing.Pool.__init__maxsize_taskqueue

オブジェクト(プールまたはキューのいずれか)にモンキーパッチを適用することもできますが、モンキーパッチを適用する必要があるためpool._taskqueue._maxsizepool._taskqueue._sem非常に脆弱になります。

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)
于 2012-06-15T18:36:37.570 に答える
3

pool._taskqueueが目的のサイズを超えている場合は待ちます。

import multiprocessing
import time

import numpy as np


def f(a,b):
    return np.linalg.solve(a,b)

def test(max_apply_size=100):
    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))

        while p._taskqueue.qsize() > max_apply_size:
            time.sleep(1)

    p.close()
    p.join()

if __name__ == '__main__':
    test()
于 2017-02-25T20:26:13.410 に答える
1

これがトップアンサーに代わるモンキーパッチです:

import queue
from multiprocessing.pool import ThreadPool as Pool


class PatchedQueue():
  """
  Wrap stdlib queue and return a Queue(maxsize=...)
  when queue.SimpleQueue is accessed
  """

  def __init__(self, simple_queue_max_size=5000):
    self.simple_max = simple_queue_max_size  

  def __getattr__(self, attr):
    if attr == "SimpleQueue":
      return lambda: queue.Queue(maxsize=self.simple_max)
    return getattr(queue, attr)


class BoundedPool(Pool):
  # Override queue in this scope to use the patcher above
  queue = PatchedQueue()

pool = BoundedPool()
pool.apply_async(print, ("something",))

queue.SimpleQueueこれは、マルチプロセッシングプールがタスクキューのセットアップに使用しているPython3.8で期待どおりに機能しています。の実装はmultiprocessing.Pool2.7以降変更されているようです

于 2021-03-27T19:11:54.463 に答える
0

queue.put()この場合の代わりに、maxsizeパラメーターを使用して明示的なキューを追加して使用することができますpool.apply_async()。次に、ワーカープロセスは次のことができます。

for a, b in iter(queue.get, sentinel):
    # process it

メモリ内に作成される入力引数/結果の数を、アクティブなワーカープロセスのおおよその数に制限する場合は、次のpool.imap*()メソッドを使用できます。

#!/usr/bin/env python
import multiprocessing
import numpy as np

def f(a_b):
    return np.linalg.solve(*a_b)

def main():
    args = ((np.random.rand(1000,1000), np.random.rand(1000))
            for _ in range(1000))
    p = multiprocessing.Pool()
    for result in p.imap_unordered(f, args, chunksize=1):
        pass
    p.close()
    p.join()

if __name__ == '__main__':
    main()
于 2012-06-15T22:45:45.433 に答える