20

Python でマルチプロセッサ プログラミングを試しています。Fibonacciたとえば、分割統治アルゴリズムを考えてみましょう。プログラムの実行フローはツリーのように分岐し、並列に実行されます。つまり、ネストされた並列処理の例があります。

Java から、スレッドプール パターンを使用してリソースを管理しました。これは、プログラムが非常に迅速に分岐し、存続期間の短いスレッドが多数作成される可能性があるためです。を介して、単一の静的 (共有) スレッドプールをインスタンス化できます ExecutorService

Poolについても同じことが予想されますが、Pool オブジェクトはグローバルに共有されないようです。たとえば、使用してプールを共有するmultiprocessing.Manager.Namespace()と、エラーが発生します。

プロセス間でプール オブジェクトを渡すことも、ピクルス化することもできません

2 部構成の質問があります。

  1. ここで何が欠けていますか。プロセス間でプールを共有すべきではないのはなぜですか?
  2. Python でネストされた並列処理を実装するためのパターンは何ですか? 可能であれば、再帰構造を維持し、反復のためにそれを交換しないでください。

from concurrent.futures import ThreadPoolExecutor

def fibonacci(n):
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

def main():
    global pool

    N = int(10)
    with ThreadPoolExecutor(2**N) as pool:
        print(fibonacci(N))

main()

ジャワ

public class FibTask implements Callable<Integer> {

    public static ExecutorService pool = Executors.newCachedThreadPool();
    int arg;

    public FibTask(int n) {
        this.arg= n;
    }

    @Override
    public Integer call() throws Exception {
        if (this.arg > 2) { 
            Future<Integer> left = pool.submit(new FibTask(arg - 1));
            Future<Integer> right = pool.submit(new FibTask(arg - 2));
            return left.get() + right.get();
        } else {
            return 1;
        }

    } 

  public static void main(String[] args) throws Exception {
      Integer n = 14;
      Callable<Integer> task = new FibTask(n);
      Future<Integer> result =FibTask.pool.submit(task); 
      System.out.println(Integer.toString(result.get()));
      FibTask.pool.shutdown();            
  }    

}

ここで問題になるかどうかはわかりませんが、「プロセス」と「スレッド」の違いを無視しています。私にとっては、どちらも「仮想化されたプロセッサ」を意味します。私の理解では、プールの目的は「プール」またはリソースを共有することです。実行中のタスクは、プールにリクエストを送信できます。並列タスクが他のスレッドで完了すると、それらのスレッドを回収して新しいタスクに割り当てることができます。プールの共有を禁止して、各スレッドが独自の新しいプールをインスタンス化する必要があるようにすることは、私には意味がありません。

4

2 に答える 2

5

1)ここで何が欠けていますか。プロセス間でプールを共有すべきではないのはなぜですか?

すべてのオブジェクト/インスタンスが選択可能/シリアル化可能であるとは限りません。この場合、プールは選択可能ではない threading.lock を使用します。

>>> import threading, pickle
>>> pickle.dumps(threading.Lock())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
[...]
  File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle lock objects

またはそれ以上:

>>> import threading, pickle
>>> from concurrent.futures import ThreadPoolExecutor
>>> pickle.dumps(ThreadPoolExecutor(1))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File 
[...]
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save
        rv = reduce(self.proto)
      File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
        raise TypeError, "can't pickle %s objects" % base.__name__
    TypeError: can't pickle lock objects

考えてみると、ロックはオペレーティング システムによって管理されるセマフォ プリミティブです (Python はネイティブ スレッドを使用するため)。Python ランタイム内でそのオブジェクトの状態をピクルして保存することができても、その真の状態は OS によって保持されているため、意味のあることは何も達成されません。

2) Python でネストされた並列処理を実装するためのパターンは何ですか? 可能であれば、再帰構造を維持し、反復のためにトレードしない

さて、名声のために、プロセス(ProcessPoolExecutor)ではなくスレッド(ThreadPoolExecutor)を使用しているため、上記のすべてが実際にはあなたの例には当てはまりません。プロセス間でデータを共有する必要はありません。

使用しているスレッド プール (CachedThreadPool) が必要に応じて新しいスレッドを作成しているのに対し、Python executor の実装は制限されており、明示的な最大スレッド数 (max_workers) が必要であるため、Java の例はより効率的であるように見えます。言語間には少し構文の違いがあり、それがあなたを混乱させているように見えます (Python の静的インスタンスは本質的に明示的にスコープされていないものです) が、本質的に両方の例は、実行するためにまったく同じ数のスレッドを作成します。たとえば、Python でかなり素朴な CachedThreadPoolExecutor 実装を使用した例を次に示します。

from concurrent.futures import ThreadPoolExecutor

class CachedThreadPoolExecutor(ThreadPoolExecutor):
    def __init__(self):
        super(CachedThreadPoolExecutor, self).__init__(max_workers=1)

    def submit(self, fn, *args, **extra):
        if self._work_queue.qsize() > 0:
            print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1))
            self._max_workers +=1

        return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra)

pool = CachedThreadPoolExecutor()

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = pool.submit(fibonacci, n - 1)
    b = pool.submit(fibonacci, n - 2)
    return a.result() + b.result()

print(fibonacci(10))

性能調整:

スレッドのオーバーヘッドなしで高い同時実行性が得られるため、geventを検討することを強くお勧めします。これは常に当てはまるとは限りませんが、コードは実際には gevent の使用法のポスターの子です。次に例を示します。

import gevent

def fibonacci(n):
    print n
    if n < 2:
        return n
    a = gevent.spawn(fibonacci, n - 1)
    b = gevent.spawn(fibonacci, n - 2)
    return a.get()  + b.get()

print(fibonacci(10))

完全に非科学的ですが、私のコンピューターでは、上記のコードはスレッド化された同等のコードよりも 9 倍速く実行されます。

これが役立つことを願っています。

于 2013-06-17T04:47:05.790 に答える
1

1.ここで何が欠けていますか; プロセス間でプールを共有すべきではないのはなぜですか?

通常、言語に関係なく、プロセス間で OS スレッドを共有することはまったくできません。

プール マネージャーへのアクセスをワーカー プロセスと共有するように手配することもできますが、それはおそらく問題に対する適切な解決策ではありません。下記参照。

2. Python でネストされた並列処理を実装するためのパターンは何ですか? 可能であれば、再帰構造を維持し、反復のためにそれを交換しないでください。

これは、データに大きく依存します。

CPython では、一般的な答えは、効率的な並列操作を実装するデータ構造を使用することです。これの良い例は、NumPyの最適化された配列タイプです。これを使用して、大きな配列操作を複数のプロセッサ コアに分割する例を次に示します。

ただし、ブロッキング再帰を使用して実装されたフィボナッチ関数は、ワーカープールベースのアプローチには特に適していません。 fib(N) は、他のワーカーを待つ以外に何もせずに N 個のワーカーを結び付けることに多くの時間を費やします。フィボナッチ関数に具体的にアプローチする方法は他にもたくさんあります (たとえば、CPSを使用してブロッキングを排除し、一定数のワーカーを埋めるなど)、例よりも、解決する実際の問題に基づいて戦略を決定する方がよいでしょう。このような。

于 2013-06-17T09:44:22.753 に答える