(1) デコレータを変更して、それが取る f オブジェクトがクラスのインスタンス メソッドである場合、それが返すエグゼキュータもそのクラス オブジェクトのインスタンス メソッドになるようにするにはどうすればよいでしょうか (ピクルできないというこのビジネスそれらのインスタンスメソッドをピクルできるので、起こりません)?
>>> myfoo.parSquare
<bound method Foo.executor of <__main__.Foo object at 0x101332510>>
ご覧のとおり、parSquare は実際にはインスタンス メソッドになったエグゼキュータです。デコレータは関数ラッパーのようなものなので、これは当然のことです。
関数デコレータのチェーンを作成するには? おそらく、デコレータの最も良い説明があります。
(2) _pickle_function と _unpickle_function メソッドを追加で作成した方が良いですか?
両方のタイプをピクルするために同じアルゴリズムを使用しているため、実際のところ、これcopy_reg.pickle(types.FunctionType, _pickle_method, _unpickle_method)
は少し奇妙に思えます。
より大きな問題はPicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
、エラー自体がなんとなく漠然としているように見えるのに、関数の検索に失敗したように見えるのはなぜですか?
何が起こっているのかというと、デコレータがあなたのケースで内部的に定義されたもので関数をオーバーライドしていると思いますparSquare
がexecutor
、executor
内部関数であるparallel
ため、インポートできないため、ルックアップが失敗しているようです。これは単なる予感です。
もっと簡単な例を試してみましょう。
>>> def parallel(function):
... def apply(values):
... from multiprocessing import Pool
... pool = Pool(4)
... result = pool.map(function, values)
... pool.close()
... pool.join()
... return result
... return apply
...
>>> @parallel
... def square(value):
... return value**2
...
>>>
>>> square([1,2,3,4])
Exception in thread Thread-1:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
私たちが得ていたのとほとんど同じエラーです。
上記のコードは次と同等であることに注意してください。
def parallel(function):
def apply(values):
from multiprocessing import Pool
pool = Pool(4)
result = pool.map(function, values)
pool.close()
pool.join()
return result
return apply
def square(value):
return value**2
square = parallel(square)
同じエラーが発生しますが、関数の名前を変更しない場合にも注意してください。
>>> def parallel(function):
... def apply(values):
... from multiprocessing import Pool
... pool = Pool(4)
... result = pool.map(function, values)
... pool.close()
... pool.join()
... return result
... return apply
...
>>> def _square(value):
... return value**2
...
>>> square = parallel(_square)
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>
デコレータが名前を扱う方法を制御する方法を探していましたが、役に立たず、マルチプロセッシングでそれらを使用したいので、やや醜い回避策を思いつきました:
>>> def parallel(function):
... def temp(_):
... def apply(values):
... from multiprocessing import Pool
... pool = Pool(4)
... result = pool.map(function, values)
... pool.close()
... pool.join()
... return result
... return apply
... return temp
...
>>> def _square(value):
... return value*value
...
>>> @parallel(_square)
... def square(values):
... pass
...
>>> square([1,2,3,4])
[1, 4, 9, 16]
>>>
基本的には実際の関数をデコレータに渡してから、2 番目の関数を使用して値を処理しました。
完全ではありませんが、デコレータをより適切に処理するために、最初のコードを少し変更しました。
import types, copy_reg, multiprocessing as mp
def parallel(f):
def executor(*args):
_pool = mp.Pool(2)
func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
_result = _pool.map(func, args[1])
_pool.close()
_pool.join()
return _result
return executor
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
func = None
for cls in cls.mro():
if func_name in cls.__dict__:
func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor
break
else:
for attr in dir(cls):
prop = getattr(cls, attr)
if hasattr(prop, '__call__') and prop.__name__ == func_name:
func = cls.__dict__[attr]
break
if func == None:
raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class Foo(object):
def __init__(self, args):
self.my_args = args
def squareArg(self, arg):
return arg**2
def par_squareArg(self):
p = mp.Pool(2) # Replace 2 with the number of processors.
q = p.map(self.squareArg, self.my_args)
p.close()
p.join()
return q
@parallel
def parSquare(self, num):
return self.squareArg(num)
if __name__ == "__main__":
myfoo = Foo([1,2,3,4])
print myfoo.par_squareArg()
print myfoo.parSquare(myfoo.my_args)
基本的に、これはまだ失敗します。AssertionError: daemonic processes are not allowed to have children
サブプロセスが関数を呼び出そうとしているため、サブプロセスは実際には名前だけでコードをコピーしないことに注意してください...
回避策の 1 つは、前に述べたものと似ています。
import types, copy_reg, multiprocessing as mp
def parallel(f):
def temp(_):
def executor(*args):
_pool = mp.Pool(2)
func = getattr(args[0], f.__name__) # This will get the actual method function so we can use our own pickling procedure
_result = _pool.map(func, args[1])
_pool.close()
_pool.join()
return _result
return executor
return temp
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
cls_name = ''
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
if cls_name:
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
func = None
for cls in cls.mro():
if func_name in cls.__dict__:
func = cls.__dict__[func_name] # This will fail with the decorator, since parSquare is being wrapped around as executor
break
else:
for attr in dir(cls):
prop = getattr(cls, attr)
if hasattr(prop, '__call__') and prop.__name__ == func_name:
func = cls.__dict__[attr]
break
if func == None:
raise KeyError("Couldn't find function %s withing %s" % (str(func_name), str(cls)))
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
class Foo(object):
def __init__(self, args):
self.my_args = args
def squareArg(self, arg):
return arg**2
def par_squareArg(self):
p = mp.Pool(2) # Replace 2 with the number of processors.
q = p.map(self.squareArg, self.my_args)
p.close()
p.join()
return q
def _parSquare(self, num):
return self.squareArg(num)
@parallel(_parSquare)
def parSquare(self, num):
pass
if __name__ == "__main__":
myfoo = Foo([1,2,3,4])
print myfoo.par_squareArg()
print myfoo.parSquare(myfoo.my_args)
[1, 4, 9, 16]
[1, 4, 9, 16]
最後にもう 1 つ、マルチスレッドの場合は細心の注意を払ってください。データをどのようにセグメント化するかによっては、主に値を前後にコピーしたり、サブプロセスを作成および破棄したりするオーバーヘッドが原因で、シングル スレッドよりもマルチスレッドの方が実際には遅くなる可能性があります。
常にシングル/マルチスレッドのベンチマークを行い、可能な場合はデータを適切にセグメント化してください。
適例:
import numpy
import time
from multiprocessing import Pool
def square(value):
return value*value
if __name__ == '__main__':
pool = Pool(5)
values = range(1000000)
start = time.time()
_ = pool.map(square, values)
pool.close()
pool.join()
end = time.time()
print "multithreaded time %f" % (end - start)
start = time.time()
_ = map(square, values)
end = time.time()
print "single threaded time %f" % (end - start)
start = time.time()
_ = numpy.asarray(values)**2
end = time.time()
print "numpy time %f" % (end - start)
v = numpy.asarray(values)
start = time.time()
_ = v**2
end = time.time()
print "numpy without pre-initialization %f" % (end - start)
私たちに与えます:
multithreaded time 0.484441
single threaded time 0.196421
numpy time 0.184163
numpy without pre-initialization 0.004490