したがって、このテストされていない提案コードが与えられます。プールに 4 つのプロセスがある場合、各プロセスに 25 個の作業が割り当てられるか、または 100 個の作業が実行する作業を探しているプロセスによって 1 つずつ選択され、各プロセスが異なる数の作業、たとえば 30 個の作業を実行できるようになります。 、26、24、20。
まあ、明白な答えはそれをテストすることです。
現状では、ジョブはできるだけ早く終了し、プールされたプロセスがジョブの準備ができたときにジョブを取得したとしても、物事が均等に分散される可能性があるため、テストでは多くのことはわかりません. しかし、それを修正する簡単な方法があります:
import collections
import multiprocessing
import os
import random
import time
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
#print moo
time.sleep(random.randint(0, 50) / 10.)
return os.getpid()
pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)
数字が「ギザギザ」になっている場合は、プールされたプロセスが新しいジョブを準備完了として取得している必要があることがわかります。(私は明示的chunksize
に 1 に設定して、チャンクが大きすぎないことを確認して、最初にそれぞれが 1 つのチャンクしか取得しないようにします。)
8 コア マシンで実行すると、次のようになります。
Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})
そのため、プロセスはその場で新しいジョブを取得しているようです。
あなたは具体的に4人の労働者について尋ねたので、私はこれに変更Pool()
しPool(4)
て得ました:
Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})
ただし、テストするよりもさらに良い方法があります:ソースを読んでください。
ご覧のとおり、map
を呼び出すだけでmap_async
、一連のバッチが作成され、self._taskqueue
オブジェクト (Queue.Queue
インスタンス) に配置されます。さらに読むと、このキューは他のプロセスと直接共有されていませんが、プロセスが終了して結果を返すたびに、次のジョブをキューからポップしてプロセスに送信するプール マネージャー スレッドがあります。
これは、デフォルトのチャンクサイズが何であるかを調べる方法でもありますmap
。上でリンクされた 2.7 の実装は、len(iterable) / (len(self._pool) * 4)
切り上げられていることを示しています (分数演算を避けるために、それよりも少し冗長です)。つまり、プロセスごとに約 4 つのチャンクに十分な大きさです。しかし、これに頼るべきではありません。ドキュメントは、何らかのヒューリスティックを使用することを漠然とかつ間接的に暗示していますが、それが何であるかについての保証はありません。したがって、「プロセスごとに約 4 つのチャンク」が本当に必要な場合は、明示的に計算してください。より現実的には、デフォルト以外のものが必要な場合は、(計算、推測、またはプロファイリングによって) 解決しようとしているドメイン固有の値が必要になる可能性があります。