4

コードのパフォーマンスに問題があります。ステップ# IIIIは何時間もかかります。以前はitertools.prodctを実体化していましたが、ユーザーのおかげで もう実行しpro_data = product(array_b,array_a)ません。これはメモリの問題を解決するのに役立ちましたが、それでも非常に時間がかかります。マルチスレッドまたはマルチプロセスで並列化したいと思います。あなたが提案できることは何でも、私は感謝しています。

説明。パーティクルのx値とy値を含む2つの配列があります。各粒子(2つの座標で定義)について、別の粒子で関数を計算したいと思います。組み合わせの場合、itertools.productメソッドを使用して、すべてのパーティクルをループします。合計で50000を超える粒子を実行しているため、計算するN * N/2の組み合わせがあります。

前もって感謝します

import numpy as np
import matplotlib.pyplot as plt
from itertools import product,combinations_with_replacement

def func(ar1,ar2,ar3,ar4): #example func that takes four arguments
  return (ar1*ar2**22+np.sin(ar3)+ar4)

def newdist(a):
  return func(a[0][0],a[0][1],a[1][0],a[1][1])    

x_edges = np.logspace(-3,1, num=25) #prepare x-axis for histogram 

x_mean = 10**((np.log10(x_edges[:-1])+np.log10(x_edges[1:]))/2)
x_width=x_edges[1:]-x_edges[:-1]

hist_data=np.zeros([len(x_edges)-1])

array1=np.random.uniform(0.,10.,100)
array2=np.random.uniform(0.,10.,100)

array_a = np.dstack((array1,array1))[0]
array_b = np.dstack((array2,array2))[0]
# IIII
for i in product(array_a,array_b):
  (result,bins) = np.histogram(newdist(i),bins=x_edges)
  hist_data+=result

hist_data = np.array(map(float, hist_data))
plt.bar(x_mean,hist_data,width=x_width,color='r')
plt.show()

-----編集-----私は今このコードを使用しました:

def mp_dist(array_a,array_b, d, bins): #d chunks AND processes
  def worker(array_ab, out_q):
      """ push result in queue """
      outdict = {}
      outdict = vec_chunk(array_ab, bins)
      out_q.put(outdict)
  out_q = mp.Queue()
  a = np.swapaxes(array_a, 0 ,1)
  b = np.swapaxes(array_b, 0 ,1)
  array_size_a=len(array_a)-(len(array_a)%d)
  array_size_b=len(array_b)-(len(array_b)%d)
  a_chunk = array_size_a / d
  b_chunk = array_size_b / d
  procs = []
  #prepare arrays for mp
  array_ab = np.empty((4, a_chunk, b_chunk))
  for j in xrange(d):
    for k in xrange(d):
      array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
      array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
      p = mp.Process(target=worker, args=(array_ab, out_q))
      procs.append(p)
      p.start()
  resultarray = np.empty(len(bins)-1)
  for i in range(d):
      resultarray+=out_q.get() 
  # Wait for all worker processes to finish
  for pro in procs:
      pro.join()
  print resultarray
  return resultarray

ここでの問題は、プロセスの数を制御できないことです。代わりにどのように使用できますmp.Pool()か?よりも

4

2 に答える 2

4

まず、問題の単純なベクトル化を見てみましょう。とはまったく同じ、つまり粒子の座標にしたいと思っていますarray_aが、ここではそれらを分離しています。array_b

タイミングを簡単にするために、コードを関数に変換しました。

def IIII(array_a, array_b, bins) :
    hist_data=np.zeros([len(bins)-1])
    for i in product(array_a,array_b):
        (result,bins) = np.histogram(newdist(i), bins=bins)
        hist_data+=result
    hist_data = np.array(map(float, hist_data))
    return hist_data

ちなみに、次のように、それほど複雑でない方法でサンプルデータを生成できます。

n = 100
array_a = np.random.uniform(0, 10, size=(n, 2))
array_b = np.random.uniform(0, 10, size=(n, 2))

したがって、最初にあなたのをベクトル化する必要がありますfunc。私はそれがどんなarray形でも取ることができるようにそれをし(4, ...)ました。メモリを節約するために、適切な場所で計算を実行し、最初のプレーン、つまりを返しarray[0]ます。

def func_vectorized(a) :
    a[1] **= 22
    np.sin(a[2], out=a[2])
    a[0] *= a[1]
    a[0] += a[2]
    a[0] += a[3]
    return a[0]

この関数を配置すると、次のベクトル化されたバージョンを記述できますIIII

def IIII_vec(array_a, array_b, bins) :
    array_ab = np.empty((4, len(array_a), len(array_b)))
    a = np.swapaxes(array_a, 0 ,1)
    b = np.swapaxes(array_b, 0 ,1)
    array_ab[[0, 1]] = a[:, :, None]
    array_ab[[2, 3]] = b[:, None, :]
    newdist = func_vectorized(array_ab)
    hist, _ = np.histogram(newdist, bins=bins)
    return hist

n = 100ポイントを使用すると、両方とも同じものを返します。

In [2]: h1 = IIII(array_a, array_b, x_edges)

In [3]: h2 = IIII_bis(array_a, array_b, x_edges)

In [4]: np.testing.assert_almost_equal(h1, h2)

しかし、タイミングの違いはすでに非常に関連しています。

In [5]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 654 ms per loop

In [6]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.08 ms per loop

300倍のスピードアップ!より長いサンプルデータで再試行すると、、のようn = 1000に両方のスケーリングが同じように悪いn**2ことがわかります。したがって、300xはそこにとどまります。

In [10]: %timeit IIII(array_a, array_b, x_edges)
1 loops, best of 3: 68.2 s per loop

In [11]: %timeit IIII_bis(array_a, array_b, x_edges)
1 loops, best of 3: 229 ms per loop

だからあなたはまだ良い10分を見ています。現在のソリューションに必要な2日以上と比較すると、実際にはそれほど多くはありません。

もちろん、物事をとても素晴らしいものにするためには(4, 50000, 50000)、フロートの配列をメモリに収める必要があります。これは、私のシステムでは処理できません。ただし、チャンクで処理することにより、物事を比較的高速に保つことができます。次のバージョンのIIII_vecは、各配列をdチャンクに分割します。書かれているように、配列の長さは。で割り切れる必要がありますd。その制限を克服するのはそれほど難しいことではありませんが、それは本当の目的を曖昧にするでしょう:

def IIII_vec_bis(array_a, array_b, bins, d=1) :
    a = np.swapaxes(array_a, 0 ,1)
    b = np.swapaxes(array_b, 0 ,1)
    a_chunk = len(array_a) // d
    b_chunk = len(array_b) // d
    array_ab = np.empty((4, a_chunk, b_chunk))
    hist_data = np.zeros((len(bins) - 1,))
    for j in xrange(d) :
        for k in xrange(d) :
            array_ab[[0, 1]] = a[:, a_chunk * j:a_chunk * (j + 1), None]
            array_ab[[2, 3]] = b[:, None, b_chunk * k:b_chunk * (k + 1)]
            newdist = func_vectorized(array_ab)
            hist, _ = np.histogram(newdist, bins=bins)
            hist_data += hist
    return hist_data

まず、それが実際に機能することを確認しましょう。

In [4]: h1 = IIII_vec(array_a, array_b, x_edges)

In [5]: h2 = IIII_vec_bis(array_a, array_b, x_edges, d=10)

In [6]: np.testing.assert_almost_equal(h1, h2)

そして今、いくつかのタイミング。とn = 100

In [7]: %timeit IIII_vec(array_a, array_b, x_edges)
100 loops, best of 3: 2.02 ms per loop

In [8]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
100 loops, best of 3: 12 ms per loop

しかし、メモリ内にますます大きな配列が必要になると、それをチャンクで実行することで成果が得られ始めます。とn = 1000

In [12]: %timeit IIII_vec(array_a, array_b, x_edges)
1 loops, best of 3: 223 ms per loop

In [13]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 208 ms per loop

配列なしでは呼び出すことn = 10000ができなくなりましたが、エラーが大きすぎますが、分厚いバージョンはまだ実行されています。IIII_vec

In [18]: %timeit IIII_vec_bis(array_a, array_b, x_edges, d=10)
1 loops, best of 3: 21.8 s per loop

そして、それができることを示すために、私はそれを一度実行しましたn = 50000

In [23]: %timeit -n1 -r1 IIII_vec_bis(array_a, array_b, x_edges, d=50)
1 loops, best of 1: 543 s per loop

したがって、9分間の数値計算は、25億回のインタラクションを計算したことを考えると、それほど悪くはありません。

于 2013-03-07T18:07:09.103 に答える
4

ベクトル化された numpy 操作を使用します。を使用して引数を作成することにより、for ループをproduct()単一の呼び出しに置き換えます。newdist()meshgrid()

のサブブロックに対応するnewdist()のスライスでarray_a問題の計算を並列化します。これは、スライスとマルチプロセッシングを使用した例です。array_bmeshgrid()

手順を示す別の例を次に示します: python ループ -> ベクトル化された numpy バ​​ージョン -> 並列:

#!/usr/bin/env python
from __future__ import division
import math
import multiprocessing as mp
import numpy as np

try:
    from itertools import izip as zip
except ImportError:
    zip = zip # Python 3

def pi_loop(x, y, npoints):
    """Compute pi using Monte-Carlo method."""
    #  note: the method converges to pi very slowly.
    return 4 * sum(1 for xx, yy in zip(x, y) if (xx**2 + yy**2) < 1) / npoints

def pi_vectorized(x, y, npoints):
    return 4 * ((x**2 + y**2) < 1).sum() / npoints # or just .mean()

def mp_init(x_shared, y_shared):
    global mp_x, mp_y
    mp_x, mp_y = map(np.frombuffer, [x_shared, y_shared]) # no copy

def mp_pi(args):
    # perform computations on slices of mp_x, mp_y
    start, end = args
    x = mp_x[start:end] # no copy
    y = mp_y[start:end]
    return ((x**2 + y**2) < 1).sum()

def pi_parallel(x, y, npoints):
    # compute pi using multiple processes
    pool = mp.Pool(initializer=mp_init, initargs=[x, y])
    step = 100000
    slices = ((start, start + step) for start in range(0, npoints, step))
    return 4 * sum(pool.imap_unordered(mp_pi, slices)) / npoints

def main():
    npoints = 1000000

    # create shared arrays
    x_sh, y_sh = [mp.RawArray('d', npoints) for _ in range(2)]

    # initialize arrays
    x, y = map(np.frombuffer, [x_sh, y_sh])
    x[:] = np.random.uniform(size=npoints)
    y[:] = np.random.uniform(size=npoints)

    for f, a, b in [(pi_loop, x, y), 
                    (pi_vectorized, x, y), 
                    (pi_parallel, x_sh, y_sh)]:
        pi = f(a, b, npoints)
        precision = int(math.floor(math.log10(npoints)) / 2 - 1 + 0.5)
        print("%.*f %.1e" % (precision + 1, pi, abs(pi - math.pi)))

if __name__=="__main__":
    main()

の時間パフォーマンスnpoints = 10_000_000:

pi_loop pi_vectorized pi_parallel 
   32.6         0.159       0.069 # seconds

主なパフォーマンス上の利点は、python ループをそのベクトル化された numpy アナログに変換することによるものであることを示しています。

于 2013-03-07T14:25:02.690 に答える