67

複数のプロセスで並行して処理したいデータの非常に大きな(読み取り専用)配列があります。

私はこのPool.map関数が好きで、それを使用してそのデータの関数を並行して計算したいと思います。

ValueまたはArrayクラスを使用して、プロセス間で共有メモリデータを使用できることを確認しました。しかし、これを使おうとするとRuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance、Pool.map関数を使用すると次のようになります。

これが私がやろうとしていることの簡単な例です:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

誰かが私がここで間違っていることを教えてもらえますか?

したがって、私がやりたいのは、プロセスプールで作成された後、新しく作成された共有メモリに割り当てられた配列に関する情報をプロセスに渡すことです。

4

4 に答える 4

58

賞金を見たばかりなので、もう一度やり直してください;)

基本的に、エラーメッセージはそれが言ったことを意味すると思います-マルチプロセッシング共有メモリ配列は引数として渡すことができません(ピクルスによって)。データをシリアル化することは意味がありません。重要なのは、データが共有メモリであるということです。したがって、共有アレイをグローバルにする必要があります。私の最初の答えのように、モジュールの属性としてそれを置くのは良いことだと思いますが、あなたの例ではそれをグローバル変数として残すだけでもうまくいきます。フォークの前にデータを設定したくないというあなたのポイントを取り入れて、ここに修正された例があります。複数の可能な共有配列が必要な場合(そしてそれが引数としてtoShareを渡したい理由です)、同様に共有配列のグローバルリストを作成し、インデックスをcount_itに渡すことができます(これはになりfor c in toShare[i]:ます)。

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[編集:フォークを使用していないため、上記はWindowsでは機能しません。ただし、以下はWindowsでも機能し、プールを使用しているため、これが必要なものに最も近いと思います。

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

マップが配列をピクルスしない理由はわかりませんが、プロセスとプールはピクルスします-おそらく、Windowsでのサブプロセスの初期化の時点で転送されたと思います。ただし、データはフォーク後も設定されていることに注意してください。

于 2009-11-12T12:39:03.877 に答える
7

データが読み取り専用の場合は、プールからフォークする前のモジュールでデータを変数にします。そうすれば、すべての子プロセスがそれにアクセスできるようになり、書き込みを行わない限り、コピーされません。

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

lock=Falseキーワード引数を使用して試すこともできますが、配列を使用したい場合は(デフォルトではtrueです)。

于 2009-11-04T20:16:29.773 に答える
7

私が見ている問題は、Poolが引数リストを介した共有データのピクルス化をサポートしていないことです。これが、「オブジェクトは継承を通じてプロセス間でのみ共有されるべきである」というエラーメッセージの意味です。共有データは継承する必要があります。つまり、Poolクラスを使用して共有する場合はグローバルです。

それらを明示的に渡す必要がある場合は、multiprocessing.Processを使用する必要がある場合があります。これがあなたの作り直された例です:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

出力:('s'、9)('a'、2)('b'、3)('d'、12)

キューの要素の順序は異なる場合があります。

これをより一般的でプールに似たものにするために、固定のN個のプロセスを作成し、キーのリストをN個に分割してから、ラッパー関数をプロセスターゲットとして使用して、リスト内の各キーに対してcount_itを呼び出すことができます。次のように渡されます。

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
于 2009-11-10T02:08:28.457 に答える
6

あなたが見ている場合:

RuntimeError:同期されたオブジェクトは、継承を通じてプロセス間でのみ共有する必要があります

multiprocessing.Managerこの制限がないため、使用を検討してください。マネージャーは、おそらく完全に別のプロセスで実行されることを考慮して作業します。

import ctypes
import multiprocessing

# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1
于 2019-10-02T20:08:31.793 に答える