17

私は先物モジュールを初めて使用し、並列化の恩恵を受ける可能性のあるタスクを持っています。しかし、スレッドの関数とプロセスの関数をセットアップする方法を正確に理解できないようです。誰でもこの問題について何か助けていただければ幸いです。

粒子群最適化 (PSO)を実行しています。PSO 自体について詳しくは説明しませんが、コードの基本的なレイアウトは次のとおりです。

Particleメソッドを備えたクラスがありますgetFitness(self)(これは、いくつかのメトリックを計算し、それを に保存しますself.fitness)。PSO シミュレーションには、複数の粒子インスタンスがあります (10 を簡単に超えます。一部のシミュレーションでは 100 または 1000 ですらあります)。
ときどき、粒子の適合度を計算する必要があります。現在、私は for ループでこれを行います:

for p in listOfParticles:
  p.getFitness(args)

ただし、各粒子の適合度は互いに独立して計算できることに気付きました。これにより、このフィットネス計算は並列化の最有力候補になります。確かに、私はすることができmap(lambda p: p.getFitness(args), listOfParticles)ました。

今、私はこれを簡単に行うことができますfutures.ProcessPoolExecutor

with futures.ProcessPoolExecutor() as e:
  e.map(lambda p: p.getFitness(args), listOfParticles)

呼び出しの副作用はp.getFitness各パーティクル自体に格納されているため、 から返されることを心配する必要はありませんfutures.ProcessPoolExecutor()

ここまでは順調ですね。しかし今、私はそれが新しいプロセスを作成することに気付きましたProcessPoolExecutor。つまり、メモリをコピーするため、速度が低下します。メモリを共有できるようにしたいので、スレッドを使用する必要があります。それは良いことですが、各プロセス内で複数のスレッドを使用して複数のプロセスを実行する方が高速になる可能性が高いことに気付くまでは.

ここで問題が発生します。
これまでに見た例に基づいて、 は でThreadPoolExecutor動作しlistます。そうProcessPoolExecutorです。そのため、単一のオブジェクトを処理する必要があるため、反復的にProcessPoolExecutorファームアウトするThreadPoolExecutorことはできません(以下に投稿された私の試みを参照してください)。 一方で、必要なスレッドの数を把握するために独自の魔法を実行したいので、自分自身をスライスすることはできません。ThreadPoolExecutor
listOfParticlesThreadPoolExecutor

したがって、大きな問題 (ついに) :
プロセスとスレッドの両方を使用して以下を効果的に並列化できるように、コードをどのように構成すればよいでしょうか。

for p in listOfParticles:
  p.getFitness()

これは私が試してきたことですが、うまくいかないことがわかっているので、あえて実行しようとしません。

>>> def threadize(func, L, mw):
...     with futures.ThreadpoolExecutor(max_workers=mw) as executor:
...             for i in L:
...                     executor.submit(func, i)
... 

>>> def processize(func, L, mw):
...     with futures.ProcessPoolExecutor() as executor:
...             executor.map(lambda i: threadize(func, i, mw), L)
...

これを修正する方法、またはアプローチを改善する方法についての考えをいただければ幸いです

念のため、私は python3.3.2 を使用しています

4

3 に答える 3

19

問題を解決するためにプロセスとスレッドを組み合わせた実用的なコードを提供しますが、それはあなたが期待しているものではありません;-) 最初に、実際のデータを危険にさらさないモックプログラムを作成します. 無害なもので実験してください。それでは、ここから始めます。

class Particle:
    def __init__(self, i):
        self.i = i
        self.fitness = None
    def getfitness(self):
        self.fitness = 2 * self.i

今、私たちは遊ぶものを持っています。次のいくつかの定数:

MAX_PROCESSES = 3
MAX_THREADS = 2 # per process
CHUNKSIZE = 100

それらを味わってください。 CHUNKSIZE後で説明します。

あなたにとって最初の驚きは、私の最下位レベルのワーカー関数が何をするかです。それは、あなたがここで過度に楽観的だからです:

p.getFitness の呼び出しの副作用は各パーティクル自体に格納されるため、futures.ProcessPoolExecutor() からの戻り値について心配する必要はありません。

残念ながら、ワーカー プロセスで実行された処理は、メイン プログラムのインスタンスに影響を与えることはできません。Particleワーカー プロセスは、コピーParticleオン ライトの実装を介して、またはプロセス間で渡された pickle をfork()unpickle して作成されたコピーを操作しているため、インスタンスのコピーを操作します。Particle

したがって、メイン プログラムにフィットネス結果を表示させたい場合は、情報をメイン プログラムに送り返すように手配する必要があります。私はあなたの実際のプログラムについて十分に知らないので、ここではそれが一意の整数であり、メイン プログラムが整数をインスタンスParticle().iに簡単にマップできると仮定しています。Particleそれを念頭に置いて、ここでの最下位レベルのワーカー関数は、一意の整数と適合度の結果のペアを返す必要があります。

def thread_worker(p):
    p.getfitness()
    return (p.i, p.fitness)

そのため、 のリストをスレッド全体に広げて、結果Particleのリストを返すのは簡単です。(particle_id, fitness)

def proc_worker(ps):
    import concurrent.futures as cf
    with cf.ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
        result = list(e.map(thread_worker, ps))
    return result

ノート:

  1. これは、各ワーカー プロセスが実行する関数です。
  2. 私はPython 3を使用しているので、リスト内のすべての結果list()を強制的に具体化するために使用します。e.map()
  3. コメントで述べたように、CPython では、CPU バウンドのタスクを複数のスレッドに分散させると、単一のスレッドですべてを実行するよりも遅くなります。

Particleあとは、 のリストをプロセス全体に広げて結果を取得するコードを書くだけです。これは で簡単に実行できるmultiprocessingので、これを使用します。それができるかどうかconcurrent.futuresはわかりませんが(スレッドも混在していることを考えると)、気にしません。しかし、私はあなたに動作するコードを提供しているので、あなたはそれで遊んで報告することができます ;-)

if __name__ == "__main__":
    import multiprocessing

    particles = [Particle(i) for i in range(100000)]
    # Note the code below relies on that particles[i].i == i
    assert all(particles[i].i == i for i in range(len(particles)))

    pool = multiprocessing.Pool(MAX_PROCESSES)
    for result_list in pool.imap_unordered(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
        for i, fitness in result_list:
            particles[i].fitness = fitness

    pool.close()
    pool.join()

    assert all(p.fitness == 2*p.i for p in particles)

ノート:

  1. sのリストをParticle「手作業で」チャンクに分割しています。それCHUNKSIZEがそのためです。これは、ワーカー プロセスが作業対象のs のリストを必要とするためであり、関数がそれを必要とするためです。関係なく作業をチャンクアップすることをお勧めします。そのため、呼び出しごとのインタープロセス オーバーヘッドと引き換えに、費用対効果が得られます。Particlefutures map()
  2. imap_unordered()結果が返される順序については保証しません。これにより、実装は可能な限り効率的に作業を配置する自由度が高まります。ここでは順序は気にしないので、問題ありません。
  3. ループは結果を取得し、それに応じてインスタンス(particle_id, fitness)を変更することに注意してください。Particleおそらく、あなたの本物.getfitnessはインスタンスに他の突然変異を起こしParticleます-推測できません。とにかく、メインプログラムは、「魔法によって」ワーカーで行われた変更を確認することはありません。明示的に調整する必要があります。制限内では、(particle_id, particle_instance)代わりにペアを返し、メイン プログラムのインスタンスを置き換えることができます。Particle次に、ワーカー プロセスで行われたすべての変更を反映します。

楽しむ :-)

先物はずっと下がる

交換は非常に簡単でしたmultiprocessing。変更点は次のとおりです。これも (前述のように)元のインスタンスを置き換えParticleて、すべてのミューテーションをキャプチャします。ただし、ここにはトレードオフがあります。インスタンスをピクルすると、単一の「適合性」結果をピクルするよりも「多くの」バイトが必要になります。より多くのネットワーク トラフィック。あなたの毒を選んでください;-)

変更されたインスタンスを返すにはthread_worker()、次のように の最後の行を置き換えるだけです。

return (p.i, p)

次に、すべての " main " ブロックを次のように置き換えます。

def update_fitness():
    import concurrent.futures as cf
    with cf.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as e:
        for result_list in e.map(proc_worker,
                      (particles[i: i+CHUNKSIZE]
                       for i in range(0, len(particles), CHUNKSIZE))):
            for i, p in result_list:
                particles[i] = p

if __name__ == "__main__":
    particles = [Particle(i) for i in range(500000)]
    assert all(particles[i].i == i for i in range(len(particles)))

    update_fitness()

    assert all(particles[i].i == i for i in range(len(particles)))
    assert all(p.fitness == 2*p.i for p in particles)

コードはmultiprocessorダンスと非常によく似ています。個人的には、価値があるmultiprocessingので、バージョンを使用します。imap_unorderedこれは単純化されたインターフェイスの問題です。多くの場合、有用な可能性を隠すことを犠牲にして単純化を購入します。

于 2013-11-25T01:01:58.923 に答える
4

まず、すべてのコアにプロセスをロードしながら、複数のスレッドを実行することを活用できますか? それがCPUバウンドの場合、ほとんどそうではありません。少なくともいくつかのテストを行う必要があります。

スレッドを追加することでパフォーマンスが向上する場合、次の問題は、手作りの負荷分散または自動でパフォーマンスを向上できるかどうかです。手作りとは、ワークロードを同様の計算上の複雑さのチャンクに慎重に分割し、チャンクごとに新しいタスク プロセッサを導入することを意味します。プロセス/スレッドのプールの自動作成と、新しいタスクのワーク キューでの通信によって、あなたが目指すものです。私の見解では、最初のアプローチは Apache Hadoop パラダイムの 1 つであり、2 番目のアプローチは Celery などのワークス キュー プロセッサによって実装されます。最初のアプローチでは、一部のタスクのチャンクが遅くなり、他のタスクが完了している間に実行されるという問題が発生する可能性があります。2 番目のアプローチでは、通信とタスク待機のオーバーヘッドが追加されます。これは、パフォーマンス テストの 2 番目のポイントです。

最後に、マルチスレッドを含むプロセスの静的コレクションが必要な場合は、知る限り、そのままでは達成できずconcurrent.futures、少し変更する必要があります。このタスクの既存のソリューションがあるかどうかはわかりませんがconcurrent、純粋な Python ソリューション (C コードなし) と同様に、簡単に実行できます。ワークプロセッサはクラスの_adjust_process_count ルーチンで定義されProcessPoolExecutorており、マルチスレッドアプローチでサブクラス化してオーバーライドするのはかなり簡単です。カスタムを提供するだけで_process_worker、に基づいてconcurrent.features.thread

参考までに原文ProcessPoolExecutor._adjust_process_count

def _adjust_process_count(self):
    for _ in range(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                      self._result_queue))
        p.start()
        self._processes[p.pid] = p
于 2013-11-15T06:56:01.157 に答える