2

multiprocessing モジュールの使用時に pickle エラーが発生するという一般的な問題に遭遇しました。

私の正確な問題は、関数で呼び出す前に、呼び出している関数に何らかの状態を与える必要があることですpool.mapが、そうすると、ここでattribute lookup __builtin__.function failed見つかったエラーが発生します。

リンクされたSOの回答に基づいて、関数を使用する唯一の方法pool.mapは、定義された関数自体を呼び出して、現在の関数の範囲外で検索されるようにすることです。

上記の説明が不十分だったような気がするので、コードの問題を次に示します。:)

プールなしでのテスト

# Function to be called by the multiprocessing pool
def my_func(x):
    massive_list, medium_list, index1, index2 = x
    result = [massive_list[index1 + x][index2:] for x in xrange(10)]
    return result in medium_list



if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = map(my_func, to_crunch)

これは A-OK で、期待どおりに機能します。それが「間違っている」唯一のことは、それが遅いということです。

プール試行 1

# (Note: my_func() remains the same)
if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    pool = multiprocessing.Pool(2)
    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = pool.map(my_func, to_crunch)

これは技術的には機能しますが、驚くほど 18 倍遅くなります! スローダウンは、各呼び出しで 2 つの大規模なデータ構造をコピーするだけでなく、それらが渡されるときにそれらをピクル/アンピクルすることから来ているに違いありません。非プール バージョンは、実際のリストではなく、大規模なリストへの参照を渡すだけでよいというメリットがあります。

ボトルネックを突き止めたので、2 つの巨大なリストを state として に保存しようとしましたmy_func。そうすれば、私の理解が正しければ、ワーカーごとに 1 回 (私の場合は 4 回) コピーするだけで済みます。

プール試行 2:

my_func2 つのリストを格納された状態として渡すクロージャーで締めくくります。

def build_myfunc(m,s):
    def my_func(x):
        massive_list = m # close the state in there
        small_list = s

        index1, index2 = x
        result = [massive_list[index1 + x][index2:] for x in xrange(10)]
        return result in medium_list
    return my_func

if __name__ == '__main__':

    data = [comprehension which loads a ton of state]
    source = [comprehension which also loads a medium amount of state]

    modified_func = build_myfunc(data, source)

    pool = multiprocessing.Pool(2)
    for num in range(100):
        to_crunch = ((massive_list, small_list, num, x) for x in range(1000)) 
        result = pool.map(modified_func, to_crunch)

ただし、(上記のリンクされたSOの質問に基づいて)同じスコープ内からマルチプロセッシングで関数を呼び出すことはできないため、これはピクルエラーを返します。

エラー:

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

それで、この問題を回避する方法はありますか?

4

1 に答える 1

0

マップはワークロードを分散する方法です。funcにデータを保存すると、当初の目的がなくなると思います。

遅い理由を探ってみましょう。それは正常ではなく、何か他のものがあるに違いありません。

まず、プロセスの数は、それらを実行するマシンに適している必要があります。あなたの例では、2 つのプロセスのプールを使用しているため、合計 3 つのプロセスが関係しています。使用しているシステムにはいくつのコアがありますか? 他に何が実行されていますか?データ処理中のシステム負荷は? 関数はデータに対して何をしますか? ディスクにアクセスしますか?または、DB を使用している可能性があります。つまり、ディスクとコアにアクセスする別のプロセスが存在する可能性があります。メモリはどうですか?初期リストを保存するのに十分ですか?

正しい実装は試行 1 です。

iostatたとえば、実行をプロファイルしてみてください。このようにして、ボトルネックを見つけることができます。

CPU で停止する場合は、コードを微調整してみてください。

Stackoverflow に関する別の回答から(私は問題なくコピーしてここに貼り付けます:P):

.map()結果を収集してから返すものを使用しています。したがって、大規模なデータセットの場合、おそらく収集段階で立ち往生しています。

結果の順序が重要でない場合.imap()イテレータのバージョンを使用してみてください (例からわかるように)。.map() .imap_unordered()

関連するドキュメントは次のとおりです次の行に注目してください。

非常に長い iterable の場合、chunksize に大きな値を使用すると、デフォルト値の 1 を使用するよりもはるかに速くジョブを完了することができます。

于 2013-10-11T09:28:10.247 に答える