3

Pythonマルチプロセッシングと混同しています。

データベースから文字列を処理する関数を高速化しようとしていますが、関数が「通常の処理」よりもワーカーのプールに渡されると時間がかかるため、マルチプロセッシングがどのように機能するかを誤解したに違いありません。

これが私が達成しようとしていることの例です。

from time import clock, time
from multiprocessing import Pool, freeze_support

from random import choice


def foo(x):
    TupWerteMany = []
    for i in range(0,len(x)):
         TupWerte = []
          s = list(x[i][3])
          NewValue = choice(s)+choice(s)+choice(s)+choice(s)
          TupWerte.append(NewValue)
          TupWerte = tuple(TupWerte)

          TupWerteMany.append(TupWerte)
     return TupWerteMany



 if __name__ == '__main__':
     start_time = time()
     List = [(u'1', u'aa', u'Jacob', u'Emily'),
        (u'2', u'bb', u'Ethan', u'Kayla')]
     List1 = List*1000000

     # METHOD 1 : NORMAL (takes 20 seconds) 
     x2 = foo(List1)
     print x2[1:3]

     # METHOD 2 : APPLY_ASYNC (takes 28 seconds)
     #    pool = Pool(4)
     #    Werte = pool.apply_async(foo, args=(List1,))
     #    x2 = Werte.get()
     #    print '--------'
     #    print x2[1:3]
     #    print '--------'

     # METHOD 3: MAP (!! DOES NOT WORK !!)

     #    pool = Pool(4)
     #    Werte = pool.map(foo, args=(List1,))
     #    x2 = Werte.get()
     #    print '--------'
     #    print x2[1:3]
     #    print '--------'


     print 'Time Elaspse: ', time() - start_time

私の質問:

  1. apply_asyncが「通常の方法」よりも時間がかかるのはなぜですか?
  2. マップで何が間違っているのですか?
  3. マルチプロセッシングを使用してこのようなタスクを高速化することは理にかなっていますか?
  4. 最後に:ここで読んだ後、PythonでのマルチプロセッシングがWindowsで機能するかどうか疑問に思っていますか?
4

3 に答える 3

2

したがって、最初の問題は、で実際の並列処理が発生していないことfoo(x)です。リスト全体を関数に1回渡します。

1)プロセスプールの考え方は、いくつかのデータの別々のビットで計算を行う多くのプロセスを持つことです。

 # METHOD 2 : APPLY_ASYNC
 jobs = 4
 size = len(List1)
 pool = Pool(4)
 results = []
 # split the list into 4 equally sized chunks and submit those to the pool
 heads = range(size/jobs, size, size/jobs) + [size]
 tails = range(0,size,size/jobs)
 for tail,head in zip(tails, heads):
      werte = pool.apply_async(foo, args=(List1[tail:head],))
      results.append(werte)

 pool.close()
 pool.join() # wait for the pool to be done

 for result in results:
      werte = result.get() # get the return value from the sub jobs

これにより、実際のスピードアップは、各チャンクの処理にかかる時間がプロセスの起動にかかる時間よりも長い場合にのみ発生します。4つのプロセスと4つのジョブを実行する場合は、もちろん、これらのダイナミクスは次のように変化します。 4つのプロセスと100のジョブを実行する必要があります。完全に新しいPythonインタープリターを4回作成していることを忘れないでください。これは、無料ではありません。

2)マップに関する問題は、マップが個別のプロセスのすべてのfoo要素に適用されることです。これにはかなりの時間がかかります。List1したがって、プールに4つのプロセスmapがある場合、リストのアイテムを4回ポップし、処理するプロセスに送信します-プロセスが終了するのを待ちます-リストのさらにいくつかのものをポップします-プロセスが終了するのを待ちます。これは、たとえば、すべてのアイテムが1ギガバイトのテキストファイルを指すファイル名である場合など、単一のアイテムの処理に時間がかかる場合にのみ意味があります。しかし、現状では、マップはリストの1つの文字列を取得し、リストのスライスを取得するfoo場所に渡します。apply_async次のコードを試してください

def foo(thing):
    print thing

map(foo, ['a','b','c','d'])

これは組み込みのPythonマップであり、単一のプロセスを実行しますが、考え方はマルチプロセスバージョンでもまったく同じです。

JFSebastianのコメントに従って追加:ただし、chunksize引数を使用しmapて、各チャンクのおおよそのサイズを指定できます。

pool.map(foo, List1, chunksize=size/jobs) 

mapテストに利用できるものがないので、Windowsで問題があるかどうかはわかりません。

3)はい、あなたの問題が新しいpythonインタープリターをフォークすることを正当化するのに十分大きいことを考えると

4)コア/プロセッサの数などに依存するため、これについて明確な答えを出すことはできませんが、一般的にはWindowsでは問題ありません。

于 2012-08-24T20:24:24.560 に答える
0

質問(2)についてDougalとMattiの指導で、何が悪かったのかを理解しました。元のfoo関数はリストのリストを処理しますが、mapは単一の要素を処理する関数を必要とします。

新しい関数は

def foo2 (x):
    TupWerte = []
    s = list(x[3])
    NewValue = choice(s)+choice(s)+choice(s)+choice(s)
    TupWerte.append(NewValue)
    TupWerte = tuple(TupWerte)
    return TupWerte

そしてそれを呼び出すためのブロック:

jobs = 4
size = len(List1)
pool = Pool()
#Werte = pool.map(foo2, List1, chunksize=size/jobs)
Werte = pool.map(foo2, List1)
pool.close()
print Werte[1:3]

私がこれを理解するのを手伝ってくれた皆さんに感謝します。

すべてのメソッドの結果: リストの場合* 2 Mioレコード:通常13.3秒、非同期と並列:7.5秒、チャン​​クサイズが7.3のマップと並列、チャンクサイズなし5.2秒

于 2012-08-25T07:00:21.717 に答える
-2

興味がある場合は、一般的なマルチプロセッシングテンプレートを次に示します。

import multiprocessing as mp
import time

def worker(x):
    time.sleep(0.2)
    print "x= %s, x squared = %s" % (x, x*x)
    return x*x

def apply_async():
    pool = mp.Pool()
    for i in range(100):
        pool.apply_async(worker, args = (i, ))
    pool.close()
    pool.join()

if __name__ == '__main__':
    apply_async()

そして、出力は次のようになります。

x= 0, x squared = 0
x= 1, x squared = 1
x= 2, x squared = 4
x= 3, x squared = 9
x= 4, x squared = 16
x= 6, x squared = 36
x= 5, x squared = 25
x= 7, x squared = 49
x= 8, x squared = 64
x= 10, x squared = 100
x= 11, x squared = 121
x= 9, x squared = 81
x= 12, x squared = 144

ご覧のとおり、番号は非同期で実行されているため、順番に並んでいません。

于 2012-08-24T20:30:45.103 に答える