3

Python での並列処理に関する簡単な質問です。大きな共有データ構造があり、それに多くの関数を並行して適用したいとしましょう。これらの関数はデータ構造に対して読み取り専用ですが、結果オブジェクトでミューテーションを実行します。

def compute_heavy_task(self):
    big_shared_object = self.big_shared_object
    result_refs = self.result_refs
    for ref in result_refs:
         some_expensive_task(ref, big_shared_object)

一度に 5 つ、または一度に 10 など、これらを並行して行うにはどうすればよいでしょうか。一度に処理できるプロセッサの数はどうですか?

4

1 に答える 1

4

Python のスレッドでこれを有効に行うことはできません (少なくとも、おそらく使用している CPython 実装ではそうではありません)。グローバル インタープリター ロックは、8 コアのうち 800% 近くの効率ではなく、90% しか得られないことを意味します。

ただし、これは別のプロセスで行うことができます。これには、標準ライブラリに組み込まれている と の 2 つのオプションがありconcurrent.futuresますmultiprocessing。一般に、futures単純なケースでは単純であり、多くの場合、構成が容易です。multiprocessing一般に、より柔軟で強力です。futuresまた、Python 3.2 以降にのみ付属していますが、 PyPI には 2.5-3.1 のバックポートがあります。

柔軟性が必要なケースの 1 つはmultiprocessing、大きな共有データ構造がある場合です。詳細については、プロセス間の状態の共有と、そのすぐ上、下、およびそこからリンクされているセクションを参照してください。

int の巨大な配列のように、データ構造が非常に単純な場合、これは非常に単純です。

class MyClass(object):
    def __init__(self, giant_iterator_of_ints):
        self.big_shared_object = multiprocessing.Array('i', giant_iterator_of_ints)
    def compute_heavy_task(self):
        lock = multiprocessing.Lock()
        def subtask(my_range):
            return some_expensive_task(self.big_shared_object, lock, my_range)
        pool = multiprocessing.pool.Pool(5)
        my_ranges = split_into_chunks_appropriately(len(self.big_shared_object)
        results = pool.map_async(subtask, my_ranges)
        pool.close()
        pool.join()

some_expensive_task関数はロック オブジェクトを取得するようになったことに注意してください。共有オブジェクトへのすべてのアクセス (または、より頻繁には、1 つ以上のアクセスで構成されるすべての "トランザクション") でロックを取得する必要があります。ロック規則は扱いにくい場合がありますが、直接データ共有を使用する場合は、これを回避する方法はありません。

がかかることにも注意してくださいmy_range。同じオブジェクトに対して同じ関数を 5 回呼び出すと、同じことを 5 回実行することになり、あまり役に立ちません。物事を並列化する一般的な方法の 1 つは、各タスクにデータ セット全体のサブ範囲を与えることです。(通常は説明が簡単であるだけでなく、適切な種類のアルゴリズムを使用してこれに注意すれば、この方法で多くのロックを回避することさえできます。)

代わりに、一連の異なる関数を同じデータセットにマップしたい場合は、単にsome_expensive_task繰り返し使用するのではなく、作業する関数のコレクションが明らかに必要です。次に、たとえば、これらの関数を繰り返し呼び出しapply_asyncて、それぞれを呼び出すことができます。しかし、それを逆にすることもできます。データを囲むクロージャーとして、関数を受け取り、それをデータに適用する単一のアプライヤー関数を記述します。次に、map関数のコレクションに対するその関数だけです。

また、データ構造は で定義できるものであると想定しましたmultiprocessing.Array。そうでない場合は、C スタイルでデータ構造を設計し、それをctypes Arrayofとして実装するStructureか、その逆として実装し、それを使用する必要がありmultiprocessing.sharedctypesます。

また、結果オブジェクトを単に返される結果に移動しました。それらも巨大で共有する必要がある場合は、同じトリックを使用して共有可能にします。


これをさらに進める前に、本当にデータを共有する必要があるかどうかを自問する必要があります。このようにすると、デバッグ、パフォーマンス チューニングなどの時間の 80% を、ロックの追加と削除、ロックの粒度の増減などに費やすことになります。または、ファイル、データベース、またはその他のほぼすべての代替手段で作業し、80% をコードの残りの部分に使用できます。

于 2013-04-26T22:59:10.853 に答える