23

mapイテラブルの場合multiprocessing.Pool、開始時にプール内の各プロセスのキューに反復が分割されますか、またはプロセスが解放されたときにタスクが取得される共通のキューがありますか?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

したがって、このテストされていない提案コードが与えられます。プールに 4 つのプロセスがある場合、各プロセスに 25 個の作業が割り当てられるか、または 100 個の作業が実行する作業を探しているプロセスによって 1 つずつ選択され、各プロセスが異なる数の作業、たとえば 30 個の作業を実行できるようになります。 、26、24、20。

4

3 に答える 3

27

したがって、このテストされていない提案コードが与えられます。プールに 4 つのプロセスがある場合、各プロセスに 25 個の作業が割り当てられるか、または 100 個の作業が実行する作業を探しているプロセスによって 1 つずつ選択され、各プロセスが異なる数の作業、たとえば 30 個の作業を実行できるようになります。 、26、24、20。

まあ、明白な答えはそれをテストすることです。

現状では、ジョブはできるだけ早く終了し、プールされたプロセスがジョブの準備ができたときにジョブを取得したとしても、物事が均等に分散される可能性があるため、テストでは多くのことはわかりません. しかし、それを修正する簡単な方法があります:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

数字が「ギザギザ」になっている場合は、プールされたプロセスが新しいジョブを準備完了として取得している必要があることがわかります。(私は明示的chunksizeに 1 に設定して、チャンクが大きすぎないことを確認して、最初にそれぞれが 1 つのチャンクしか取得しないようにします。)

8 コア マシンで実行すると、次のようになります。

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

そのため、プロセスはその場で新しいジョブを取得しているようです。

あなたは具体的に4人の労働者について尋ねたので、私はこれに変更Pool()Pool(4)て得ました:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

ただし、テストするよりもさらに良い方法があります:ソースを読んでください。

ご覧のとおり、mapを呼び出すだけでmap_async、一連のバッチが作成され、self._taskqueueオブジェクト (Queue.Queueインスタンス) に配置されます。さらに読むと、このキューは他のプロセスと直接共有されていませんが、プロセスが終了して結果を返すたびに、次のジョブをキューからポップしてプロセスに送信するプール マネージャー スレッドがあります。

これは、デフォルトのチャンクサイズが何であるかを調べる方法でもありますmap。上でリンクされた 2.7 の実装は、len(iterable) / (len(self._pool) * 4)切り上げられていることを示しています (分数演算を避けるために、それよりも少し冗長です)。つまり、プロセスごとに約 4 つのチャンクに十分な大きさです。しかし、これに頼るべきではありません。ドキュメントは、何らかのヒューリスティックを使用することを漠然とかつ間接的に暗示していますが、それが何であるかについての保証はありません。したがって、「プロセスごとに約 4 つのチャンク」が本当に必要な場合は、明示的に計算してください。より現実的には、デフォルト以外のものが必要な場合は、(計算、推測、またはプロファイリングによって) 解決しようとしているドメイン固有の値が必要になる可能性があります。

于 2012-11-07T07:14:55.303 に答える
3

http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map

map(func, iterable[, chunksize])

このメソッドは、イテラブルをいくつかのチャンクに分割し、個別のタスクとしてプロセスプールに送信します。これらのチャンクの(おおよその)サイズは、chunksizeを正の整数に設定することで指定できます。

前のチャンクが完了すると、プロセスがキューから次のチャンクを取得すると思います。

デフォルトchunksizeはの長さに依存し、iterableチャンクの数がプロセスの数の約4倍になるように選択されます。(ソース)

于 2012-11-07T07:02:17.513 に答える
1

モジュールのソースコードchunksizeを見ずに Python 実装で使用されていると見積もるには、次を実行します。multiprocessing

#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby

def work(index):
    mp.get_logger().info(index)
    return index, mp.current_process().name

if __name__ == "__main__":
    import logging
    import sys
    logger = mp.log_to_stderr()

    # process cmdline args
    try:
        sys.argv.remove('--verbose')
    except ValueError:
        pass  # not verbose
    else:
        logger.setLevel(logging.INFO)  # verbose
    nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
    # choices: 'map', 'imap', 'imap_unordered'
    map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
    kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}

    # estimate chunksize used
    max_chunksize = 0
    map_func = getattr(mp.Pool(nprocesses), map_name)
    for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
                                   key=lambda x: x[0]),  # sort by index
                            key=lambda x: x[1]):  # group by process name
        max_chunksize = max(max_chunksize, len(list(group)))
    print("%s: max_chunksize %d" % (map_name, max_chunksize))

はデフォルトimapimap_unordered使用され、 forは に依存し(プロセスごとのチャンク数は固定されていません)、Python のバージョンに依存することが示されています。パラメータが指定されている場合、すべての関数がパラメータを考慮します。chunksize=1max_chunksizemapnprocessesnitemmax_chunksize*map*chunksize

使用法

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

個々のジョブがどのように分散されているかを確認するには; パラメータを指定し--verboseます。

于 2012-11-07T08:51:07.387 に答える