4

デコレーター内のマルチプロセッシングに関する質問を更新したいと思います(私の以前の質問は私には死んでいるようです:))。私はこの問題に出くわしましたが、残念ながらこれを解決する方法がわかりません。アプリケーションが必要なため、デコレータ内でマルチプロセッシングを使用する必要がありますが...デコレータ内でマルチプロセッシングを使用すると、エラーが発生します: Can't pickle <function run_testcase at 0x00000000027789C8>: it's not found as __main__.run_testcase. 一方、通常の関数のようにマルチプロセッシング関数を呼び出すと、機能wrapper(function,*arg)します。これは非常にトリッキーですが、何が間違っているのかわかりません。これはPythonエラーであると結論付けようとしています:)。おそらく誰かがこの問題の回避策を知っていて、同じ構文を残しています。このコードを Windows で実行します (残念ながら)。

前の質問:デコレータ内でマルチプロセッシングを使用するとエラーが発生します: 関数をピクルできません...次のように見つかりません

このエラーをシミュレートする最も簡単なコード:

from multiprocessing import Process,Event

class ExtProcess(Process):
    def __init__(self, event,*args,**kwargs):
        self.event=event
        Process.__init__(self,*args,**kwargs)

    def run(self):
        Process.run(self)
        self.event.set()

class PythonHelper(object):

    @staticmethod
    def run_in_parallel(*functions):
        event=Event()
        processes=dict()
        for function in functions:
            fname=function[0]
            try:fargs=function[1]
            except:fargs=list()
            try:fproc=function[2]
            except:fproc=1
            for i in range(fproc):
                process=ExtProcess(event,target=fname,args=fargs)
                process.start()
                processes[process.pid]=process
        event.wait()
        for process in processes.values():
            process.terminate()
        for process in processes.values():
            process.join()
class Recorder(object):
    def capture(self):
        while True:print("recording")
from z_helper import PythonHelper
from z_recorder import Recorder

def wrapper(fname,*args):
    try:
        PythonHelper.run_in_parallel([fname,args],[Recorder().capture])
        print("success")
    except Exception as e:
        print("failure: {}".format(e))
from z_wrapper import wrapper
from functools import wraps

class Report(object):
    @staticmethod
    def debug(fname):
        @wraps(fname)
        def function(*args):
            wrapper(fname,args)
        return function

実行中:

from z_report import Report
import time

class Test(object):
    @Report.debug
    def print_x(self,x):
        for index,data in enumerate(range(x)):
            print(index,data); time.sleep(1)

if __name__=="__main__":
    Test().print_x(10)

以前のバージョンに @wraps を追加しました

私のトレースバック:

Traceback (most recent call last):
  File "C:\Interpreters\Python32\lib\pickle.py", line 679, in save_global
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'run_testcase'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\EskyTests\w_Logger.py", line 19, in <module>
    logger.run_logger()
  File "C:\EskyTests\w_Logger.py", line 14, in run_logger
    self.run_testcase()
  File "C:\EskyTests\w_Decorators.py", line 14, in wrapper
    PythonHelper.run_in_parallel([function,args],[recorder.capture])
  File "C:\EskyTests\w_PythonHelper.py", line 25, in run_in_parallel
    process.start()
  File "C:\Interpreters\Python32\lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "C:\Interpreters\Python32\lib\multiprocessing\forking.py", line 267, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:\Interpreters\Python32\lib\multiprocessing\forking.py", line 190, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\Interpreters\Python32\lib\pickle.py", line 237, in dump
    self.save(obj)
  File "C:\Interpreters\Python32\lib\pickle.py", line 344, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Interpreters\Python32\lib\pickle.py", line 432, in save_reduce
    save(state)
  File "C:\Interpreters\Python32\lib\pickle.py", line 299, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Interpreters\Python32\lib\pickle.py", line 623, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Interpreters\Python32\lib\pickle.py", line 656, in _batch_setitems
    save(v)
  File "C:\Interpreters\Python32\lib\pickle.py", line 299, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Interpreters\Python32\lib\pickle.py", line 683, in save_global
    (obj, module, name))
_pickle.PicklingError: Can't pickle <function run_testcase at 0x00000000027725C8>: it's not found as __main__.run_testcase
4

1 に答える 1

4

multiprocessingモジュールは、スレーブ プロセスで pickler を呼び出すことにより、そのスレーブ プロセス内の関数を「呼び出し」ます。これは、作成した IPC インターフェイスを介して関数の名前をスレーブ プロセスに送信する必要があるためです。pickler は使用する適切な名前を見つけてそれを送信し、反対側では unpickler がその名前を関数に変換します。

関数がクラス メンバである場合、助けがなければ適切にピクルすることはできません。@staticmethodメンバーには typefunctionではなく typeinstancemethodがあり、pickler をだますため、これはさらに悪いことです。を使用しなくても、これは非常に簡単に確認できますmultiprocessing

import pickle

class Klass(object):
    @staticmethod
    def func():
        print 'func()'
    def __init__(self):
        print 'Klass()'

obj = Klass()
obj.func()
print pickle.dumps(obj.func)

生成:

Klass()
func()
Traceback (most recent call last):
 ...
pickle.PicklingError: Can't pickle <function func at 0x8017e17d0>: it's not found as __main__.func

のような通常の非静的メソッドをピクルしようとするとobj.__init__、ピッカーはそれが実際にインスタンスメソッドであることを認識するため、問題はより明確になります。

TypeError: can't pickle instancemethod objects

ただし、すべてが失われるわけではありません。間接的なレベルを追加するだけです。ターゲットプロセスでインスタンス バインディングを作成する通常の関数を提供し、少なくとも 2 つの引数 (pickle 可能なクラスインスタンスと関数の名前) を送信できます。完全を期すために、関数を呼び出すときに使用する引数も追加します。次に、ターゲット プロセスでこの通常の関数を呼び出すと、クラスのメンバー関数が呼び出されます。

def call_name(instance, name, *args = (), **kwargs = None):
    "helper function for multiprocessing: call instance.getattr(name)"
    if kwargs is None:
        kwargs = {}
    getattr(instance, name)(*args, **kwargs)

代わりに(これはリンクされた投稿からコピーされます):

PythonHelper.run_in_parallel([self.run_testcase],[recorder.capture])

次のようにします (呼び出しシーケンスをいじる必要があるかもしれません):

PythonHelper.run_in_parallel([call_name, (self, 'run_testcase')],
    [recorder.capture])

(注: これはすべてテストされておらず、さまざまなエラーがある可能性があります)。


アップデート

あなたが投稿した新しいコードを取り、試してみました。

まず、インデントを修正する必要がありましたz_report.py(すべてのインデントを解除class Report)。

それが完了したら、それを実行すると、表示されるエラーとはかなり異なるエラーが発生しました。

Process ExtProcess-1:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/tmp/t/marcin/z_helper.py", line 9, in run
    Process.run(self)
  File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in run
recording
[infinite spew of "recording" messages]

終わりのない「録音」メッセージを修正するには:

diff --git a/z_recorder.py b/z_recorder.py
index 6163a87..a482268 100644
--- a/z_recorder.py
+++ b/z_recorder.py
@@ -1,4 +1,6 @@
+import time
 class Recorder(object):
     def capture(self):
-        while True:print("recording")
-
+        while True:
+            print("recording")
+            time.sleep(5)

これにより、残りの問題が1つ残りました: への引数が間違っていますprint_x:

TypeError: print_x() takes exactly 2 arguments (1 given)

この時点で、Python は実際にすべて適切なことを行っていましたが、それz_wrapper.wrapperは少し熱心すぎるだけです。

diff --git a/z_wrapper.py b/z_wrapper.py
index a0c32bf..abb1299 100644
--- a/z_wrapper.py
+++ b/z_wrapper.py
@@ -1,7 +1,7 @@
 from z_helper import PythonHelper
 from z_recorder import Recorder

-def wrapper(fname,*args):
+def wrapper(fname,args):
     try:
         PythonHelper.run_in_parallel([fname,args],[Recorder().capture])
         print("success")

ここでの問題は、 に到達するまでz_wrapper.wrapperに、関数の引数がすべてタプルにまとめられていることです。 z_report.Report.debugすでに持っています:

    def function(*args):

そのため、2 つの引数 (この場合はインスタンスのインスタンスmain.Testと値10) がタプルになっています。引数を提供するために、z_wrapper.wrapperその(単一の)タプルを に渡したいだけです。PythonHelper.run_in_parallel別のタプルを追加する*argsと、そのタプルは (今回は 1 つの要素の) 別のタプルにラップされます。print "args:", args(これは、 inを追加することで確認できますz_wrapper.wrapper。)

于 2012-04-29T09:17:42.250 に答える