1

私はデコレーターに慣れていないので、最初のデコレーター プロジェクトでは理解できないかもしれませんが、私がやりたいのはparallel、単一の引数に謙虚に適用され、自動的に適用されるように見える関数を取るデコレーターを作成することです。で配布しmultiprocessing、引数のリストに適用される関数に変換します。

以前の質問に対するこの非常に役立つ回答をフォローアップしているため、クラスインスタンスメソッドをうまくピクルでき、回答のような例を取得して問題なく動作させることができます。

これは、並列デコレーターでの私の最初の試みです (スレッド化デコレーターについていくつかの Web ヒットを調べた後)。

###########
# Imports #
###########
import types, copy_reg, multiprocessing as mp
import pandas, numpy as np
### End Imports

##################
# Module methods #
##################

# Parallel decorator
def parallel(f):

    def executor(*args):
        _pool   = mp.Pool(2)
        _result = _pool.map_async(f, args[1:])
        # I used args[1:] because the input will be a
        # class instance method, so gotta skip over the self object.
        # but it seems like there ought to be a better way...

        _pool.close()
        _pool.join()
        return _result.get()
    return executor
### End parallel

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)
### End _pickle_method

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)
### End _unpickle_method

# This call to copy_reg.pickle allows you to pass methods as the first arg to
# mp.Pool methods. If you comment out this line, `pool.map(self.foo, ...)` results in
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
# __builtin__.instancemethod failed
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
copy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)
### End Module methods


##################
# Module classes #
##################
class Foo(object):


    def __init__(self, args):
        self.my_args = args
    ### End __init__

    def squareArg(self, arg):
        return arg**2
    ### End squareArg

    def par_squareArg(self):
        p = mp.Pool(2) # Replace 2 with the number of processors.
        q = p.map_async(self.squareArg, self.my_args)

        p.close()
        p.join()

        return q.get()
    ### End par_SquarArg

    @parallel
    def parSquare(self, num):
        return self.squareArg(num)
    ### End parSquare
### End Foo
### End Module classes


###########
# Testing #
###########
if __name__ == "__main__":

    myfoo = Foo([1,2,3,4])
    print myfoo.par_squareArg()
    print myfoo.parSquare(myfoo.my_args)

### End Testing

しかし、このアプローチを使用すると (同じ_pickle_methodandで酸洗関数を強力に武装しようとするばかげた試みで_unpickle_method)、最初にエラーAttributeError: 'function' object has no attribute 'im_func'が表示されますが、より一般的には、関数を酸洗できないというエラーが表示されます。

したがって、質問は 2 つあります。(1)デコレータを変更して、fそれが取るオブジェクトがクラスのインスタンスメソッドである場合、executorそれが返すものもそのクラスオブジェクトのインスタンスメソッドになるようにするにはどうすればよいですかそれらのインスタンスメソッドをピクルすることができるからです)?_pickle_function(2) 追加のメソッドを作成した方が良い_unpickle_functionですか? Python はモジュール レベルの関数をピクルできると思っていたので、私のコードがexecutorインスタンス メソッドにならない場合は、モジュール レベルの関数であるべきだと思われますが、なぜピクルできないのでしょうか。

4

2 に答える 2

3

(1) デコレータを変更して、それが取る f オブジェクトがクラスのインスタンス メソッドである場合、それが返すエグゼキュータもそのクラス オブジェクトのインスタンス メソッドになるようにするにはどうすればよいでしょうか (ピクルできないというこのビジネスそれらのインスタンスメソッドをピクルできるので、起こりません)?

>>> myfoo.parSquare
<bound method Foo.executor of <__main__.Foo object at 0x101332510>>

ご覧のとおり、parSquare は実際にはインスタンス メソッドになったエグゼキュータです。デコレータは関数ラッパーのようなものなので、これは当然のことです。

関数デコレータのチェーンを作成するには? おそらく、デコレータの最も良い説明があります。

(2) _pickle_function と _unpickle_function メソッドを追加で作成した方が良いですか?

両方のタイプをピクルするために同じアルゴリズムを使用しているため、実際のところ、これcopy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)は少し奇妙に思えます。

より大きな問題はPicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed、エラー自体がなんとなく漠然としているように見えるのに、関数の検索に失敗したように見えるのはなぜですか?
何が起こっているのかというと、デコレータがあなたのケースで内部的に定義されたもので関数をオーバーライドしていると思いますparSquareexecutorexecutor内部関数であるparallelため、インポートできないため、ルックアップが失敗しているようです。これは単なる予感です。

もっと簡単な例を試してみましょう。

>>> def parallel(function):                        
...     def apply(values):
...         from multiprocessing import Pool
...         pool = Pool(4)
...         result = pool.map(function, values)
...         pool.close()
...         pool.join()
...         return result    
...     return apply
... 
>>> @parallel
... def square(value):
...     return value**2
... 
>>> 
>>> square([1,2,3,4])
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

私たちが得ていたのとほとんど同じエラーです。
上記のコードは次と同等であることに注意してください。

def parallel(function):                        
    def apply(values):
        from multiprocessing import Pool
        pool = Pool(4)
        result = pool.map(function, values)
        pool.close()
        pool.join()
        return result    
    return apply    

def square(value):
    return value**2

square = parallel(square)

同じエラーが発生しますが、関数の名前を変更しない場合にも注意してください。

>>> def parallel(function):                        
...     def apply(values):
...         from multiprocessing import Pool
...         pool = Pool(4)
...         result = pool.map(function, values)
...         pool.close()
...         pool.join()
...         return result    
...     return apply    
... 
>>> def _square(value):
...     return value**2
... 
>>> square = parallel(_square)
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>

デコレータが名前を扱う方法を制御する方法を探していましたが、役に立たず、マルチプロセッシングでそれらを使用したいので、やや醜い回避策を思いつきました:

>>> def parallel(function):                
...     def temp(_):    
...         def apply(values):
...             from multiprocessing import Pool
...             pool = Pool(4)
...             result = pool.map(function, values)
...             pool.close()
...             pool.join()
...             return result    
...         return apply
...     return temp
... 
>>> def _square(value):
...     return value*value    
... 
>>> @parallel(_square)
... def square(values):
...     pass 
... 
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>

基本的には実際の関数をデコレータに渡してから、2 番目の関数を使用して値を処理しました。

完全ではありませんが、デコレータをより適切に処理するために、最初のコードを少し変更しました。

import types, copy_reg, multiprocessing as mp

def parallel(f):    
    def executor(*args):
        _pool   = mp.Pool(2)
        func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
        _result = _pool.map(func, args[1])
        _pool.close()
        _pool.join()
        return _result
    return executor

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    func = None
    for cls in cls.mro():        
        if func_name in cls.__dict__:
            func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor             
            break
        else:
            for attr in dir(cls):
                prop = getattr(cls, attr)                
                if hasattr(prop, '__call__') and prop.__name__ == func_name:
                    func = cls.__dict__[attr]
                    break
    if func == None:
        raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))        
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Foo(object):
    def __init__(self, args):
        self.my_args = args
    def squareArg(self, arg):
        return arg**2
    def par_squareArg(self):
        p = mp.Pool(2) # Replace 2 with the number of processors.
        q = p.map(self.squareArg, self.my_args)
        p.close()
        p.join()
        return q    
    @parallel
    def parSquare(self, num):
        return self.squareArg(num)  

if __name__ == "__main__":
    myfoo = Foo([1,2,3,4])
    print myfoo.par_squareArg()
    print myfoo.parSquare(myfoo.my_args)  

基本的に、これはまだ失敗します。AssertionError: daemonic processes are not allowed to have childrenサブプロセスが関数を呼び出そうとしているため、サブプロセスは実際には名前だけでコードをコピーしないことに注意してください...

回避策の 1 つは、前に述べたものと似ています。

import types, copy_reg, multiprocessing as mp

def parallel(f):    
    def temp(_):
        def executor(*args):
            _pool   = mp.Pool(2)
            func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
            _result = _pool.map(func, args[1])
            _pool.close()
            _pool.join()
            return _result        
        return executor
    return temp

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    func = None
    for cls in cls.mro():        
        if func_name in cls.__dict__:
            func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor             
            break
        else:
            for attr in dir(cls):
                prop = getattr(cls, attr)                
                if hasattr(prop, '__call__') and prop.__name__ == func_name:
                    func = cls.__dict__[attr]
                    break
    if func == None:
        raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))        
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class Foo(object):
    def __init__(self, args):
        self.my_args = args
    def squareArg(self, arg):
        return arg**2
    def par_squareArg(self):
        p = mp.Pool(2) # Replace 2 with the number of processors.
        q = p.map(self.squareArg, self.my_args)
        p.close()
        p.join()
        return q
    def _parSquare(self, num):    
        return self.squareArg(num)
    @parallel(_parSquare)
    def parSquare(self, num):
        pass    


if __name__ == "__main__":
    myfoo = Foo([1,2,3,4])
    print myfoo.par_squareArg()
    print myfoo.parSquare(myfoo.my_args)

[1, 4, 9, 16]
[1, 4, 9, 16]

最後にもう 1 つ、マルチスレッドの場合は細心の注意を払ってください。データをどのようにセグメント化するかによっては、主に値を前後にコピーしたり、サブプロセスを作成および破棄したりするオーバーヘッドが原因で、シングル スレッドよりもマルチスレッドの方が実際には遅くなる可能性があります。

常にシングル/マルチスレッドのベンチマークを行い、可能な場合はデータを適切にセグメント化してください。

適例:

import numpy
import time
from multiprocessing import Pool

def square(value):
    return value*value

if __name__ == '__main__':
    pool = Pool(5)
    values = range(1000000)
    start = time.time()
    _ = pool.map(square, values)
    pool.close()
    pool.join()
    end = time.time()

    print "multithreaded time %f" % (end - start)
    start = time.time()
    _ = map(square, values)
    end = time.time()
    print "single threaded time %f" % (end - start)

    start = time.time()
    _ = numpy.asarray(values)**2
    end = time.time()
    print "numpy time %f" % (end - start)

    v = numpy.asarray(values)
    start = time.time()
    _ = v**2
    end = time.time()
    print "numpy without pre-initialization %f" % (end - start)

私たちに与えます:

multithreaded time 0.484441
single threaded time 0.196421
numpy time 0.184163
numpy without pre-initialization 0.004490
于 2012-07-31T09:57:40.730 に答える
-1

これはあなたが探している答えではありませんが、Sage には、@parallelあなたが探しているものに沿ったデコレーターがあります。ドキュメントソースコードはオンラインで見つけることができます。

ただし、原則として、import pdb;pdb.set_trace()失敗している行の直前に追加し、見えているすべてのオブジェクトを調べます。使用している場合は、magic コマンドを使用するか、これらの行に沿って何かをipython実行できます。%pdb

于 2012-07-31T02:28:37.130 に答える