1

Pythonコードでマルチプロセッシングを使用して、関数を非同期的に実行します。

import multiprocessing

po = multiprocessing.Pool()
for elements in a_list:
    results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_big_argument)))               
po.close()
po.join()
for r in results:
    a_new_list.add(r.get())

a_big_argument辞書です。私はそれを議論として与えます。それは10から100Moの間であるという意味で大きいです。それは、私のコードのパフォーマンスに大きな影響を与えているようです。

私のコードのパフォーマンスはこの新しい引数で実際に低下したので、私はおそらくここで愚かで効率的ではないことをしています。

大きな辞書を扱う最良の方法は何ですか?関数に毎回ロードしたくありません。代わりにデータベースを作成してそれに接続することは解決策でしょうか?

実行できるコードは次のとおりです。

'''
Created on Mar 11, 2013

@author: Antonin
'''

import multiprocessing
import random

# generate an artificially big dictionary
def generateBigDict():
    myBigDict = {}
    for key in range (0,1000000):
        myBigDict[key] = 1
    return myBigDict

def myMainFunction():
    # load the dictionary
    myBigDict = generateBigDict()
    # create a list on which we will asynchronously run the subfunction
    myList = []
    for list_element in range(0,20):
        myList.append(random.randrange(0,1000000))
    # an empty set to receive results
    set_of_results = set()
    # there is a for loop here on one of the arguments
    for loop_element in range(0,150):
        results = []
        # asynchronoulsy run the subfunction
        po = multiprocessing.Pool()
        for list_element in myList:
            results.append(po.apply_async(mySubFunction, (loop_element, list_element, myBigDict)))               
        po.close()
        po.join()
        for r in results:
            set_of_results.add(r.get())
    for element in set_of_results:
        print element

def mySubFunction(loop_element, list_element, myBigDict):
    import math
    intermediaryResult = myBigDict[list_element]
    finalResult = intermediaryResult + loop_element
    return math.log(finalResult)

if __name__ == '__main__':
    myMainFunction()
4

3 に答える 3

3

multiprocessing.Managerはそれをしていました。

import multiprocessing

manager = multiprocessing.Manager()
a_shared_big_dictionary = manager.dict(a_big_dictionary)

po = multiprocessing.Pool()
for elements in a_list:
    results.append(po.apply_async(my_module.my_function, (some_arguments, elements, a_shared_big_dictionary)))               
po.close()
po.join()
for r in results:
    a_new_list.add(r.get())

今、それははるかに高速です。

于 2013-03-13T21:04:36.317 に答える
1

Pythonマルチプロセッシングの質問の共有メモリオブジェクトへの回答を参照してください。

multiprocessing.Arrayを使用して配列をサブプロセスに渡すか、fork()を使用することをお勧めします。

于 2013-03-11T22:49:26.527 に答える
1

Poolメソッドの1つ(たとえば)に渡す引数はすべてapply_async、ピクルス化して、パイプを介してワーカープロセスに送信し、ワーカープロセスでピクルス解除する必要があります。このピクルス/パス/アンピクルプロセスは、特に大きなオブジェクトグラフがある場合、各ワーカープロセスが個別のコピーを作成する必要があるため、時間とメモリにコストがかかる可能性があります。

問題の正確な形状に応じて、これらのピクルスを回避するさまざまな方法があります。ワーカーは辞書を読み取るだけで書き込みは行わないため、関数から直接辞書を安全に参照し(つまり、に渡さないでapply_asyncfork()、ワーカープロセスでコピーを作成しないようにすることができます。

さらに良いことに、andを使用して検索する代わりに、引数としてmySubFunction()受け入れるように変更できます。(これはクロージャーで実行できる場合がありますが、pickleがクローズオーバーされたオブジェクトもコピーしようとしないかどうかは100%わかりません。)intermediaryResultlist_elementmyBigDictmyBigDict

または、dbmやsqliteなどの単純な永続化メソッドの1つなど、myBigDictすべてのプロセスが安全に共有できる場所に配置し、そこからワーカーにアクセスさせることもできます。

残念ながら、これらすべてのソリューションでは、タスク関数の形状を変更する必要があります。この「形を変える」ことを避けることが、人々が「本物の」CPUスレッドを好む理由の1つです。

于 2013-03-11T23:02:24.190 に答える