7

おそらく、Pythonのマルチプロセッシングプールコードに精通している人が私を助けてくれるでしょう。ネットワーク上の複数のホストに同時に(一度にN個)ソケット接続を介して接続し、いくつかのRPCを実行しようとしています。1つのホストが終了したら、次のホストをプールに追加して、すべてが完了するまで実行します。

そのためのいくつかのメソッドを備えたクラスHClassと、hostlistに含まれているホスト名のリストがあります。しかし、これを機能させるために、Poolのdocs.python.orgの例のいずれかを取得できていません。

私がこれまでに得たものを説明するための短いコードスニペット:

hostlist = [h1, h2, h3, h4, ....]
poolsize = 2

class HClass:
  def __init__(self, hostname="default"):
    self.hostname = hostname

  def go(self):
      # do stuff
      # do more stuff
  ....

if __name__ == "__main__":
  objs = [HClass(hostname=current_host) for current_host in hostlist]
  pool = multiprocessing.pool(poolsize)
  results = pool.apply_async(objs.go())

これまでのところ、私はこのトレースバックに恵まれています。

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed

Control-Cを実行するまでプロセスがハングする場合。

4

3 に答える 3

6

プロセス間通信を最小限に抑えるように努めます。実際に送信する必要があるのは、ホスト名文字列だけのようです。

for host in hostlist:
    pool.apply_async(worker, args = (host,), callback = on_return)

例えば、

import multiprocessing as mp
import time
import logging

logger = mp.log_to_stderr(logging.INFO)

hostlist = ['h1', 'h2', 'h3', 'h4']*3
poolsize = 2

class HClass:
    def __init__(self, hostname="default"):
        self.hostname = hostname

    def go(self):
        logger.info('processing {h}'.format(h = self.hostname))
        time.sleep(1)
        return self.hostname

def worker(host):
    h = HClass(hostname = host)
    return h.go()

result = []
def on_return(retval):
    result.append(retval)

if __name__ == "__main__":
    pool = mp.Pool(poolsize)
    for host in hostlist:
        pool.apply_async(worker, args = (host,), callback = on_return)
    pool.close()
    pool.join()
    logger.info(result)
于 2013-01-02T04:34:54.450 に答える
1

これは、 PythonのマルチプロセッシングPool.map()を使用するときに<type'instancemethod'>をピクルスできない場合と同じ質問だと思います。

上記のリンクの回答からコピーしました。問題は、マルチプロセッシングはプロセス間でそれらをスリングするために物事をピクルスにする必要があり、バインドされたメソッドはピクルスできないことです。

1つのアプローチは、go関数をクラス外にするなど、無制限にすることです。または、その関数をcopy_regでパック可能にします

于 2013-01-02T03:34:54.933 に答える
1

@unutbuのソリューションに同意します...シンプルな方が良いです。ただし、クラスメソッドを送信する必要がある場合は、の代わりにをgo使用します。pathos.multiprocesssingmultiprocessing

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

ここでコードを取得します: https ://github.com/uqfoundation/pathos

于 2014-01-25T01:23:19.993 に答える