22

multiprocessing.PoolとをいじっていますNumpyが、重要な点を見逃しているようです。poolバージョンが遅いのはなぜですか?調べたところhtop、いくつかのプロセスが作成されていることがわかりますが、それらはすべて CPU の 1 つを共有しており、合計すると最大 100% になります。

$ cat test_multi.py 
import numpy as np
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':
    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    pool = Pool(8)
    print timeit(lambda: map(mmul, matrices), number=20)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

$ python test_multi.py 
16.0265390873
19.097837925

[アップデート]

  • timeitベンチマーク プロセス用に変更
  • いくつかのコアを含む初期化プール
  • より多くの計算とより少ないメモリ転送があるように計算を変更しました(私は願っています)

まだ変化なし。poolバージョンはまだ遅く、htop1 つのコアしか使用されておらず、複数のプロセスが生成されていることがわかります。

【アップデート2】

現時点では、@ Jan-Philip Gehrcke の and の使用に関する提案について読んmultiprocessing.Process()でいQueueます。しかし、それまでの間、私は知りたいです:

  1. 私の例がティアゴでうまくいくのはなぜですか? 私のマシン1で動作しない理由は何ですか?
  2. 私のコード例では、プロセス間のコピーはありますか? 私は自分のコードで、各スレッドに行列リストの 1 つの行列を与えることを意図していました。
  3. を使用しているため、私のコードは悪い例Numpyですか?

多くの場合、他の人が私の最終目標を知っていると、より良い答えが得られることがわかりました。私にはたくさんのファイルがあり、それらは atm ロードされ、シリアル方式で処理されます。処理は CPU を集中的に使用するため、並列化によって多くのことが得られると思います。私の目的は、ファイルを並行して分析する python 関数を呼び出すことです。さらに、この関数は単なる C コードへのインターフェースであり、それが違いを生んでいると思います。

1 Ubuntu 12.04、Python 2.7.3、i7 860 @ 2.80 - 詳細情報が必要な場合はコメントを残してください。

[アップデート3]

Stefano のサンプル コードの結果を次に示します。なぜかスピードが出ない。:/

testing with 16 matrices
base  4.27
   1  5.07
   2  4.76
   4  4.71
   8  4.78
  16  4.79
testing with 32 matrices
base  8.82
   1 10.39
   2 10.58
   4 10.73
   8  9.46
  16  9.54
testing with 64 matrices
base 17.38
   1 19.34
   2 19.62
   4 19.59
   8 19.39
  16 19.34

[更新 4] Jan-Philip Gehrcke のコメントへの回答

申し訳ありませんが、私は自分自身を明確にしませんでした。Update 2 で書いたように、私の主な目標は、サードパーティの Python ライブラリ関数の多数のシリアル呼び出しを並列化することです。この関数は、一部の C コードへのインターフェイスです。を使用するように勧められましたPoolが、これではうまくいきませんでしたnumpy。しかし、「並列化が難しい」ように見えても、パフォーマンスの向上を達成できませんでした。だから私は何か重要なことを見逃していたに違いないと思います。この情報は、この質問と報奨金で私が探しているものです。

【アップデート5】

多大なご意見をお寄せいただきありがとうございます。しかし、あなたの回答を読んでも、私にとってさらに多くの質問が生じるだけです。そのため、基本について読み、知らないことをより明確に理解できるようになったら、新しい SO の質問を作成します。

4

8 に答える 8

21

すべてのプロセスが同じ CPU で実行されているという事実については、こちらの回答を参照してください

インポート中numpyに、親プロセスの CPU アフィニティを変更します。Poolこれにより、生成されたすべてのワーカー プロセスを後で使用するときに、マシンで使用可能なすべてのコアを使用するのではなく、同じコアを求めて競合することになります。

すべてのコアが使用されるように、tasksetインポート後に呼び出して CPU アフィニティをリセットできます。numpy

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':

    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    print timeit(lambda: map(mmul, matrices), number=20)

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all cores
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

出力:

    $ python tmp.py                                     
    12.4765810966
    pid 29150's current affinity mask: 1
    pid 29150's new affinity mask: ff
    13.4136221409

topこのスクリプトの実行中にCPU 使用率を監視すると、「並列」部分の実行時にすべてのコアが使用されていることがわかります。他の人が指摘しているように、あなたの元の例では、データのピッキング、プロセスの作成などに伴うオーバーヘッドは、おそらく並列化による利点を上回っています。

編集:単一のプロセスが一貫して高速に見える理由の一部はnumpy、ジョブが複数のコアに分散している場合に使用できない要素単位の行列乗算を高速化するためのいくつかのトリックがある可能性があるためだと思います。

たとえば、通常の Python リストを使用してフィボナッチ数列を計算すると、並列化によって大幅なスピードアップが得られます。同様に、ベクトル化を利用しない方法で要素単位の乗算を行うと、並列バージョンで同様のスピードアップが得られます。

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool

def fib(dummy):
    n = [1,1]
    for ii in xrange(100000):
        n.append(n[-1]+n[-2])

def silly_mult(matrix):
    for row in matrix:
        for val in row:
            val * val

if __name__ == '__main__':

    dt = timeit(lambda: map(fib, xrange(10)), number=10)
    print "Fibonacci, non-parallel: %.3f" %dt

    matrices = [np.random.randn(1000,1000) for ii in xrange(10)]
    dt = timeit(lambda: map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, non-parallel: %.3f" %dt

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all CPUS
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)

    dt = timeit(lambda: pool.map(fib,xrange(10)), number=10)
    print "Fibonacci, parallel: %.3f" %dt

    dt = timeit(lambda: pool.map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, parallel: %.3f" %dt

出力:

$ python tmp.py
Fibonacci, non-parallel: 32.449
Silly matrix multiplication, non-parallel: 40.084
pid 29528's current affinity mask: 1
pid 29528's new affinity mask: ff
Fibonacci, parallel: 9.462
Silly matrix multiplication, parallel: 12.163
于 2013-03-26T21:09:54.363 に答える
17

ここでは、通信のオーバーヘッドと計算の高速化の間の予測できない競争が間違いなく問題になっています。あなたが観察しているものは完全に素晴らしいです。正味のスピードアップが得られるかどうかは多くの要因に依存し、(あなたがしたように)適切に定量化する必要があるものです。

では、なぜmultiprocessingあなたの場合は「予想外に遅い」のでしょうか。 multiprocessingと関数はmapmap_async実際には、親プロセスと子プロセスを接続するパイプを介してPythonオブジェクトを前後にピクルスします。これにはかなりの時間がかかる場合があります。その間、子プロセスはほとんど何もする必要がありません。これはで表示されhtopます。異なるシステム間では、パイプトランスポートのパフォーマンスにかなりの違いがある可能性があります。そのため、一部の人にとっては、プールコードが単一のCPUコードよりも高速ですが、そうではありません(他の要因がここで関係する可能性がありますが、これは効果を説明するための例)。

それを速くするためにあなたは何ができますか?

  1. POSIX準拠のシステムでは入力をピクルスにしないでください。

    Unixを使用している場合は、POSIXのプロセスフォークの動作(書き込み時にメモリをコピーする)を利用して、親子通信のオーバーヘッドを回避できます。

    グローバルにアクセス可能な変数の親プロセスで作業するジョブ入力(たとえば、大きなマトリックスのリスト)を作成します。次に、自分自身を呼び出してワーカープロセスを作成しmultiprocessing.Process()ます。子では、グローバル変数からジョブ入力を取得します。簡単に言えば、これにより、子は通信のオーバーヘッドなしで親のメモリにアクセスできます(*、以下の説明)。たとえば、を介して結果を親に送り返しmultiprocessing.Queueます。これにより、特に出力が入力に比べて小さい場合に、通信のオーバーヘッドを大幅に節約できます。multiprocessing.Process()親の状態を継承しないまったく新しいPythonプロセスが作成されるため、このメソッドはWindowsなどでは機能しません。

  2. numpyマルチスレッドを利用してください。 実際の計算タスクによっては、関与multiprocessingしてもまったく役に立たない場合があります。numpyを自分でコンパイルし、OpenMPディレクティブを有効にすると、ラージマトリックスでの操作は、それ自体で非常に効率的にマルチスレッド化される可能性があります(そして、多くのCPUコアに分散されます。GILはここでは制限要因ではありません)。基本的に、これはnumpy/scipyのコンテキストで取得できる複数のCPUコアの最も効率的な使用法です。

*一般的に、子供は親の記憶に直接アクセスすることはできません。ただし、その後fork()、親と子は同等の状態になります。親のメモリ全体をRAM内の別の場所にコピーするのはばかげています。そのため、コピーオンライトの原則が採用されています。子がメモリの状態を変更しない限り、実際には親のメモリにアクセスします。変更があった場合にのみ、対応するビットとピースが子のメモリ空間にコピーされます。

主な編集:

複数のワーカープロセスで大量の入力データを処理し、「1。POSIX準拠のシステムで入力を選択しないでください」というアドバイスに従うコードを追加しましょう。さらに、ワーカーマネージャー(親プロセス)に転送される情報の量は非常に少ないです。この例の重い計算部分は、単一値分解です。OpenMPを多用することができます。この例を複数回実行しました。

  • 1つ、2つ、または4つのワーカープロセスとを使用するとOMP_NUM_THREADS=1、各ワーカープロセスは100%の最大負荷を作成します。そこでは、前述の労働者数-計算時間のスケーリング動作はほぼ線形であり、正味の高速化係数は関与する労働者の数に対応します。
  • 1つ、2つ、または4つのワーカープロセスとOMP_NUM_THREADS=4を使用すると、各プロセスが最大400%の負荷を作成します(4つのOpenMPスレッドを生成することにより)。私のマシンには16個の実際のコアがあるため、それぞれ最大400%の負荷を持つ4つのプロセスで、マシンからほぼ最大のパフォーマンスが得られます。スケーリングは完全に線形ではなくなり、スピードアップ係数は関係するワーカーの数ではありませんが、絶対計算時間OMP_NUM_THREADS=1はワーカープロセスの数に比べて大幅に短縮され、時間は依然として大幅に短縮されます。
  • より大きな入力データ、4コア、およびOMP_NUM_THREADS=4。その結果、平均システム負荷は1253%になります。
  • 前回と同じ設定で1回ですが、OMP_NUM_THREADS=5。その結果、平均システム負荷は1598%になります。これは、16コアのマシンからすべてを取得したことを示しています。ただし、実際の計算の実時間は、後者の場合と比較して改善されません。

コード:

import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing


# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.


MATRIX_SIZE = 1000
MATRIX_COUNT = 16


def rnd_matrix():
    offset = np.random.randint(1,10)
    stretch = 2*np.random.rand()+0.1
    return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)


print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]


def worker_function(result_queue, worker_index, chunk_boundary):
    """Work on a certain chunk of the globally defined `INPUT` list.
    """
    result_chunk = []
    for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
        # Perform single value decomposition (CPU intense).
        u, s, v = svd(m)
        # Build single numeric value as output.
        output =  int(np.sum(s))
        result_chunk.append(output)
    result_queue.put((worker_index, result_chunk))


def work(n_workers=1):
    def calc_chunksize(l, n):
        """Rudimentary function to calculate the size of chunks for equal 
        distribution of a list `l` among `n` workers.
        """
        return int(math.ceil(len(l)/float(n)))

    # Build boundaries (indices for slicing) for chunks of `INPUT` list.
    chunk_size = calc_chunksize(INPUT, n_workers)
    chunk_boundaries = [
        (i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]

    # When n_workers and input list size are of same order of magnitude,
    # the above method might have created less chunks than workers available. 
    if n_workers != len(chunk_boundaries):
        return None

    result_queue = multiprocessing.Queue()
    # Prepare child processes.
    children = []
    for worker_index in xrange(n_workers):
        children.append(
            multiprocessing.Process(
                target=worker_function,
                args=(
                    result_queue,
                    worker_index,
                    chunk_boundaries[worker_index],
                    )
                )
            )

    # Run child processes.
    for c in children:
        c.start()

    # Create result list of length of `INPUT`. Assign results upon arrival.
    results = [None] * len(INPUT)

    # Wait for all results to arrive.
    for _ in xrange(n_workers):
        worker_index, result_chunk = result_queue.get(block=True)
        chunk_boundary = chunk_boundaries[worker_index]
        # Store the chunk of results just received to the overall result list.
        results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk

    # Join child processes (clean up zombies).
    for c in children:
        c.join()
    return results


def main():
    durations = []
    n_children = [1, 2, 4]
    for n in n_children:
        print "Crunching input with %s child(ren)." % n
        t0 = time.time()
        result = work(n)
        if result is None:
            continue
        duration = time.time() - t0
        print "Result computed by %s child process(es): %s" % (n, result)
        print "Duration: %.2f s" % duration
        durations.append(duration)
    normalized_durations = [durations[0]/d for d in durations]
    for n, normdur in zip(n_children, normalized_durations):
        print "%s-children speedup: %.2f" % (n, normdur)


if __name__ == '__main__':
    main()

出力:

$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps

$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps
于 2013-03-14T17:05:16.117 に答える
4

あなたのコードは正しいです。自分のシステム (2 コア、ハイパースレッディング) で実行したところ、次の結果が得られました。

$ python test_multi.py 
30.8623809814
19.3914041519

プロセスと、予想通り、ほぼ 100% で動作しているいくつかのプロセスを示す並列部分を調べました。これは、システムまたは python インストールに含まれている必要があります。

于 2013-03-14T16:56:45.860 に答える
2

算術スループットの測定は非常に難しい作業です。基本的に、テスト ケースは単純すぎて、多くの問題が見られます。

まず、整数演算をテストしています。特別な理由はありますか? 浮動小数点を使用すると、多くの異なるアーキテクチャ間で比較可能な結果が得られます。

2番目matrix = matrix*matrixは入力パラメータを上書きし(行列は値ではなく参照によって渡されます)、各サンプルは異なるデータで動作する必要があります...

一般的な傾向を把握するために、最後のテストは、問題のサイズとワーカーの数のより広い範囲で実施する必要があります。

だからここに私の変更されたテストスクリプトがあります

import numpy as np
from timeit import timeit
from multiprocessing import Pool

def mmul(matrix):
    mymatrix = matrix.copy()
    for i in range(100):
        mymatrix *= mymatrix
    return mymatrix

if __name__ == '__main__':

    for n in (16, 32, 64):
        matrices = []
        for i in range(n):
            matrices.append(np.random.random_sample(size=(1000, 1000)))

        stmt = 'from __main__ import mmul, matrices'
        print 'testing with', n, 'matrices'
        print 'base',
        print '%5.2f' % timeit('r = map(mmul, matrices)', setup=stmt, number=1)

        stmt = 'from __main__ import mmul, matrices, pool'
        for i in (1, 2, 4, 8, 16):
            pool = Pool(i)
            print "%4d" % i, 
            print '%5.2f' % timeit('r = pool.map(mmul, matrices)', setup=stmt, number=1)
            pool.close()
            pool.join()

そして私の結果:

$ python test_multi.py 
testing with 16 matrices
base  5.77
   1  6.72
   2  3.64
   4  3.41
   8  2.58
  16  2.47
testing with 32 matrices
base 11.69
   1 11.87
   2  9.15
   4  5.48
   8  4.68
  16  3.81
testing with 64 matrices
base 22.36
   1 25.65
   2 15.60
   4 12.20
   8  9.28
  16  9.04

[更新] この例を自宅の別のコンピューターで実行すると、一貫した速度低下が得られました。

testing with 16 matrices
base  2.42
   1  2.99
   2  2.64
   4  2.80
   8  2.90
  16  2.93
testing with 32 matrices
base  4.77
   1  6.01
   2  5.38
   4  5.76
   8  6.02
  16  6.03
testing with 64 matrices
base  9.92
   1 12.41
   2 10.64
   4 11.03
   8 11.55
  16 11.59

誰のせいなのかわからないことを告白しなければなりません(numpy、python、コンパイラ、カーネル)...

于 2013-03-22T09:01:01.747 に答える
2

デフォルトでPoolは、n プロセスのみを使用します。n はマシン上の CPU の数です。のように、使用するプロセスの数を指定する必要がありますPool(5)

詳しくはこちら

于 2013-03-14T16:15:15.537 に答える
1

たくさんのファイルがあるとおっしゃっているので、次の解決策をお勧めします。

  • ファイル名のリストを作成します。
  • 入力パラメーターとして指定された単一のファイルをロードして処理する関数を作成します。
  • Pool.map()関数をファイルのリストに適用するために使用します。

すべてのインスタンスが独自のファイルをロードするようになったため、渡される唯一のデータはファイル名であり、(潜在的に大きい) numpy 配列ではありません。

于 2013-03-24T12:08:12.247 に答える