ここでは、通信のオーバーヘッドと計算の高速化の間の予測できない競争が間違いなく問題になっています。あなたが観察しているものは完全に素晴らしいです。正味のスピードアップが得られるかどうかは多くの要因に依存し、(あなたがしたように)適切に定量化する必要があるものです。
では、なぜmultiprocessing
あなたの場合は「予想外に遅い」のでしょうか。 multiprocessing
と関数はmap
、map_async
実際には、親プロセスと子プロセスを接続するパイプを介してPythonオブジェクトを前後にピクルスします。これにはかなりの時間がかかる場合があります。その間、子プロセスはほとんど何もする必要がありません。これはで表示されhtop
ます。異なるシステム間では、パイプトランスポートのパフォーマンスにかなりの違いがある可能性があります。そのため、一部の人にとっては、プールコードが単一のCPUコードよりも高速ですが、そうではありません(他の要因がここで関係する可能性がありますが、これは効果を説明するための例)。
それを速くするためにあなたは何ができますか?
POSIX準拠のシステムでは入力をピクルスにしないでください。
Unixを使用している場合は、POSIXのプロセスフォークの動作(書き込み時にメモリをコピーする)を利用して、親子通信のオーバーヘッドを回避できます。
グローバルにアクセス可能な変数の親プロセスで作業するジョブ入力(たとえば、大きなマトリックスのリスト)を作成します。次に、自分自身を呼び出してワーカープロセスを作成しmultiprocessing.Process()
ます。子では、グローバル変数からジョブ入力を取得します。簡単に言えば、これにより、子は通信のオーバーヘッドなしで親のメモリにアクセスできます(*、以下の説明)。たとえば、を介して結果を親に送り返しmultiprocessing.Queue
ます。これにより、特に出力が入力に比べて小さい場合に、通信のオーバーヘッドを大幅に節約できます。multiprocessing.Process()
親の状態を継承しないまったく新しいPythonプロセスが作成されるため、このメソッドはWindowsなどでは機能しません。
numpyマルチスレッドを利用してください。
実際の計算タスクによっては、関与multiprocessing
してもまったく役に立たない場合があります。numpyを自分でコンパイルし、OpenMPディレクティブを有効にすると、ラージマトリックスでの操作は、それ自体で非常に効率的にマルチスレッド化される可能性があります(そして、多くのCPUコアに分散されます。GILはここでは制限要因ではありません)。基本的に、これはnumpy/scipyのコンテキストで取得できる複数のCPUコアの最も効率的な使用法です。
*一般的に、子供は親の記憶に直接アクセスすることはできません。ただし、その後fork()
、親と子は同等の状態になります。親のメモリ全体をRAM内の別の場所にコピーするのはばかげています。そのため、コピーオンライトの原則が採用されています。子がメモリの状態を変更しない限り、実際には親のメモリにアクセスします。変更があった場合にのみ、対応するビットとピースが子のメモリ空間にコピーされます。
主な編集:
複数のワーカープロセスで大量の入力データを処理し、「1。POSIX準拠のシステムで入力を選択しないでください」というアドバイスに従うコードを追加しましょう。さらに、ワーカーマネージャー(親プロセス)に転送される情報の量は非常に少ないです。この例の重い計算部分は、単一値分解です。OpenMPを多用することができます。この例を複数回実行しました。
- 1つ、2つ、または4つのワーカープロセスとを使用すると
OMP_NUM_THREADS=1
、各ワーカープロセスは100%の最大負荷を作成します。そこでは、前述の労働者数-計算時間のスケーリング動作はほぼ線形であり、正味の高速化係数は関与する労働者の数に対応します。
- 1つ、2つ、または4つのワーカープロセスと
OMP_NUM_THREADS=4
を使用すると、各プロセスが最大400%の負荷を作成します(4つのOpenMPスレッドを生成することにより)。私のマシンには16個の実際のコアがあるため、それぞれ最大400%の負荷を持つ4つのプロセスで、マシンからほぼ最大のパフォーマンスが得られます。スケーリングは完全に線形ではなくなり、スピードアップ係数は関係するワーカーの数ではありませんが、絶対計算時間OMP_NUM_THREADS=1
はワーカープロセスの数に比べて大幅に短縮され、時間は依然として大幅に短縮されます。
- より大きな入力データ、4コア、および
OMP_NUM_THREADS=4
。その結果、平均システム負荷は1253%になります。
- 前回と同じ設定で1回ですが、
OMP_NUM_THREADS=5
。その結果、平均システム負荷は1598%になります。これは、16コアのマシンからすべてを取得したことを示しています。ただし、実際の計算の実時間は、後者の場合と比較して改善されません。
コード:
import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing
# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.
MATRIX_SIZE = 1000
MATRIX_COUNT = 16
def rnd_matrix():
offset = np.random.randint(1,10)
stretch = 2*np.random.rand()+0.1
return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)
print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]
def worker_function(result_queue, worker_index, chunk_boundary):
"""Work on a certain chunk of the globally defined `INPUT` list.
"""
result_chunk = []
for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
# Perform single value decomposition (CPU intense).
u, s, v = svd(m)
# Build single numeric value as output.
output = int(np.sum(s))
result_chunk.append(output)
result_queue.put((worker_index, result_chunk))
def work(n_workers=1):
def calc_chunksize(l, n):
"""Rudimentary function to calculate the size of chunks for equal
distribution of a list `l` among `n` workers.
"""
return int(math.ceil(len(l)/float(n)))
# Build boundaries (indices for slicing) for chunks of `INPUT` list.
chunk_size = calc_chunksize(INPUT, n_workers)
chunk_boundaries = [
(i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]
# When n_workers and input list size are of same order of magnitude,
# the above method might have created less chunks than workers available.
if n_workers != len(chunk_boundaries):
return None
result_queue = multiprocessing.Queue()
# Prepare child processes.
children = []
for worker_index in xrange(n_workers):
children.append(
multiprocessing.Process(
target=worker_function,
args=(
result_queue,
worker_index,
chunk_boundaries[worker_index],
)
)
)
# Run child processes.
for c in children:
c.start()
# Create result list of length of `INPUT`. Assign results upon arrival.
results = [None] * len(INPUT)
# Wait for all results to arrive.
for _ in xrange(n_workers):
worker_index, result_chunk = result_queue.get(block=True)
chunk_boundary = chunk_boundaries[worker_index]
# Store the chunk of results just received to the overall result list.
results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk
# Join child processes (clean up zombies).
for c in children:
c.join()
return results
def main():
durations = []
n_children = [1, 2, 4]
for n in n_children:
print "Crunching input with %s child(ren)." % n
t0 = time.time()
result = work(n)
if result is None:
continue
duration = time.time() - t0
print "Result computed by %s child process(es): %s" % (n, result)
print "Duration: %.2f s" % duration
durations.append(duration)
normalized_durations = [durations[0]/d for d in durations]
for n, normdur in zip(n_children, normalized_durations):
print "%s-children speedup: %.2f" % (n, normdur)
if __name__ == '__main__':
main()
出力:
$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps
$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps
$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps