10

これまで、使用する必要があるときはいつでもmultiprocessing、手動で「プロセスプール」を作成し、すべてのサブプロセスと作業キューを共有することで使用してきました。

例えば:

from multiprocessing import Process, Queue


class MyClass:

    def __init__(self, num_processes):
        self._log         = logging.getLogger()
        self.process_list = []
        self.work_queue   = Queue()
        for i in range(num_processes):
            p_name = 'CPU_%02d' % (i+1)
            self._log.info('Initializing process %s', p_name)
            p = Process(target = do_stuff,
                        args   = (self.work_queue, 'arg1'),
                        name   = p_name)

このようにして、サブプロセスによって消費されるものをキューに追加できます。Queue.qsize()次に、 :をチェックすることで、処理がどの程度進んだかを監視できます。

    while True:
        qsize = self.work_queue.qsize()
        if qsize == 0:
            self._log.info('Processing finished')
            break
        else:
            self._log.info('%d simulations still need to be calculated', qsize)

今、私はそれmultiprocessing.Poolがこのコードをかなり単純化できると思います。

私が見つけられなかったのは、まだ行われていない「作業」の量をどのように監視できるかということです。

次の例を見てください。

from multiprocessing import Pool


class MyClass:

    def __init__(self, num_processes):
        self.process_pool = Pool(num_processes)
        # ...
        result_list = []
        for i in range(1000):            
            result = self.process_pool.apply_async(do_stuff, ('arg1',))
            result_list.append(result)
        # ---> here: how do I monitor the Pool's processing progress?
        # ...?

何か案は?

4

4 に答える 4

16

Managerキューを使用します。これは、ワーカープロセス間で共有されるキューです。通常のキューを使用する場合、各ワーカーによってピクルスおよびピクルス解除されるため、コピーされるため、各ワーカーがキューを更新することはできません。

次に、ワーカーにキューにデータを追加してもらい、ワーカーが作業している間、キューの状態を監視します。これを使用してこれを行う必要がありmap_asyncます。これにより、結果全体の準備ができたことを確認でき、監視ループを中断できます。

例:

import time
from multiprocessing import Pool, Manager


def play_function(args):
    """Mock function, that takes a single argument consisting
    of (input, queue). Alternately, you could use another function
    as a wrapper.
    """
    i, q = args
    time.sleep(0.1)  # mock work
    q.put(i)
    return i

p = Pool()
m = Manager()
q = m.Queue()

inputs = range(20)
args = [(i, q) for i in inputs]
result = p.map_async(play_function, args)

# monitor loop
while True:
    if result.ready():
        break
    else:
        size = q.qsize()
        print(size)
        time.sleep(0.1)

outputs = result.get()
于 2013-04-14T15:21:25.320 に答える
3

私はasync_callのために以下の解決策を思いついた。

ささいなおもちゃのスクリプトの例ですが、広く適用する必要があると思います。

基本的に、無限ループでは、リストジェネレーターで結果オブジェクトの準備完了値をポーリングし、合計して、ディスパッチされたプールタスクの残りの数を取得します。

残りがなくなったら、break()&close()を実行します。

必要に応じて、スリープインループを追加します。

上記のソリューションと同じ原則ですが、キューはありません。プールに最初に送信したタスクの数も追跡している場合は、完了率などを計算できます。

import multiprocessing
import os
import time
from random import randrange


def worker():
    print os.getpid()

    #simulate work
    time.sleep(randrange(5))

if __name__ == '__main__':

    pool = multiprocessing.Pool(processes=8)
    result_objs = []

    print "Begin dispatching work"

    task_count = 10
    for x in range(task_count):
        result_objs.append(pool.apply_async(func=worker))

    print "Done dispatching work"

    while True:
        incomplete_count = sum(1 for x in result_objs if not x.ready())

        if incomplete_count == 0:
            print "All done"
            break

        print str(incomplete_count) + " Tasks Remaining"
        print str(float(task_count - incomplete_count) / task_count * 100) + "% Complete"
        time.sleep(.25)

    pool.close()
    pool.join()
于 2014-08-08T17:18:16.483 に答える
2

私は同じ問題を抱えていて、MapResultオブジェクトのやや単純な解決策を思いつきました(内部のMapResultデータを使用していますが)

pool = Pool(POOL_SIZE)

result = pool.map_async(get_stuff, todo)
while not result.ready():
    remaining = result._number_left * result._chunksize
    sys.stderr.write('\r\033[2KRemaining: %d' % remaining)
    sys.stderr.flush()
    sleep(.1)

print >> sys.stderr, '\r\033[2KRemaining: 0'

チャンクサイズは処理するアイテムの数に応じて切り上げられることが多いため、残りの値は必ずしも正確ではないことに注意してください。

これを回避するには、pool.map_async(get_stuff, todo, chunksize=1)

于 2015-12-13T12:20:54.947 に答える
0

ドキュメントから、あなたがしたいことはresult、リストまたは他のシーケンスであなたのを収集し、次に結果リストのチェックを繰り返しreadyて出力リストを構築することであるように私には見えます。次に、準備完了状態にない残りの結果オブジェクトの数を、ディスパッチされたジョブの総数と比較することにより、処理ステータスを計算できます。http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResultを参照してください

于 2012-12-03T19:48:39.760 に答える