27

マルチプロセッシングモジュールでPool.map_async()(および)を使用するときに問題が発生します。に入力された関数が「通常の」関数であるPool.map()限り、正常に機能するループ並列関数を実装しました。Pool.map_async関数が例えばクラスへのメソッドである場合、私は:を取得しPicklingErrorます

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

私はPythonを科学計算にのみ使用しているので、ピクルスの概念にあまり精通しておらず、今日それについて少し学びました。マルチプロセッシングPool.map()を使用しているときに<type'instancemethod'>をピクルすることができないなど、以前のいくつかの回答を見てきましたが、回答で提供されているリンクをたどっても、それを機能させる方法がわかりません。

私のコード。目的は、複数のコアを使用して通常のrvのベクトルをシミュレートすることです。これは単なる例であり、複数のコアで実行しても見返りがない場合があることに注意してください。

import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat

def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
    """
    Purpose: Evaluate function using Multiple cores.

    Input:
        func       - Function to evaluate in parallel
        arg        - Array of arguments to evaluate func(arg)  
        static_arg - The "static" argument (if any), i.e. the variables that are      constant in the evaluation of func.
        nWorkers   - Number of Workers to process computations.
    Output:
        func(i, static_arg) for i in args.
    
    """
    # Prepare arguments for func: Collect arguments with static argument (if any)
    if static_arg != None:
        arguments = [[arg] + static_arg for arg in list(args)]
    else:
        arguments = args
    
    # Initialize workers
    pool = mp.Pool(processes = nWorkers) 

    # Evaluate function
    result = pool.map_async(func, arguments, chunksize = chunksize)
    pool.close()
    pool.join()

    return sp.array(result.get()).flatten() 

# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
    def subfunc(a):
        return spstat.norm.rvs(loc = loc, scale = scale, size = a)
    return subfunc

# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be 
# pickled
def test(fargs):
    x, a, b = fargs
    return spstat.norm.rvs(size = x, loc = a, scale = b)

# Try it out.
N = 1000000

# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each 
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.

# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)

# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)

マルチプロセッシングPool.map()を使用しているときに<type'instancemethod'>をピクルスにできないという質問への回答で提供されているリンクをたどると、Steven Bethard(ほぼ最後)がcopy_regモジュールの使用を提案します。彼のコードは次のとおりです。

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

import copy_reg
import types

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

これをどうやって利用できるのかよくわかりません。私が思いついたのは、コードの直前に置くことだけでしたが、役に立ちませんでした。簡単な解決策は、もちろん、機能するものを使用して、に関与しないようにすることcopy_regです。copy_reg毎回問題を回避することなく、マルチプロセッシングを最大限に活用するために適切に作業することに興味があります。

4

1 に答える 1

21

ここでの問題は、概念よりも「ピクル」エラー メッセージではありません。マルチプロセスは、その魔法を実行するために、「ワーカー」の異なるプロセスでコードをフォークします。

次に、データをシームレスにシリアライズおよびデシリアライズすることにより、別のプロセスとの間でデータを送受信します (これが pickle を使用する部分です)。

前後に渡されるデータの一部が関数である場合、呼び出し先プロセスに同じ名前の関数が存在すると想定し、関数名を文字列として渡します。関数はステートレスであるため、呼び出されたワーカー プロセスは、受信したデータを使用して同じ関数を呼び出すだけです。(Python 関数は pickle を介してシリアル化できないため、マスター プロセスとワーカー プロセスの間で参照のみが渡されます)

self関数がインスタンス内のメソッドである場合 - Python をコーディングする場合は、「自動」変数を使用する関数と同じものですが、その下では同じではありません。インスタンス (オブジェクト) はステートフルであるためです。つまり、ワーカー プロセスには、反対側で呼び出したいメソッドの所有者であるオブジェクトのコピーがありません。

メソッドを関数として map_async 呼び出しに渡す方法を回避することもできません。マルチプロセスは、渡すときに実際の関数ではなく、関数参照を使用するだけだからです。

そのため、(1) メソッドではなく関数をワーカー プロセスに渡すようにコードを変更し、オブジェクトが保持している状態を新しいパラメーターに変換して呼び出す必要があります。(2) ワーカー プロセス側で必要なオブジェクトを再構築し、その中の関数を呼び出す map_async 呼び出しの「ターゲット」関数を作成します。Python の最も単純なクラスはそれ自体が選択可能であるため、map_async 呼び出しで関数所有者自体であるオブジェクトを渡すことができ、「ターゲット」関数はワーカー側で適切なメソッド自体を呼び出します。

(2)「難しい」ように聞こえるかもしれませんが、オブジェクトのクラスをピクルできない場合を除き、おそらく次のようなものです。

import types

def target(object, *args, **kw):
    method_name = args[0]
    return getattr(object, method_name)(*args[1:])
(...)    
#And add these 3 lines prior to your map_async call:


    # Evaluate function
    if isinstance (func, types.MethodType):
        arguments.insert(0, func.__name__)
        func = target
    result = pool.map_async(func, arguments, chunksize = chunksize)

*免責事項:私はこれをテストしていません

于 2011-08-10T20:44:17.240 に答える