319

簡単な例でエラーを再現できないこと、およびコードが複雑すぎて投稿できないことをお詫び申し上げます。通常の Python ではなく IPython シェルでプログラムを実行すると、うまくいきます。

この問題に関する以前のメモを調べました。それらはすべて、プールを使用してクラス関数内で定義された関数を呼び出すことが原因でした。しかし、これは私には当てはまりません。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

助けていただければ幸いです。

更新:私がピクルする関数は、モジュールのトップレベルで定義されています。ネストされた関数を含む関数を呼び出しますが。つまり、ネストされた関数を持つf()呼び出しをg()呼び出し、私は呼び出しています。、、はすべてトップレベルで定義されています。このパターンでより簡単な例を試してみましたが、うまくいきます。h()i()pool.apply_async(f)f()g()h()

4

9 に答える 9

376

漬物の一覧 です。特に、関数はモジュールのトップレベルで定義されている場合にのみ pickle 可能です。

このコード片:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

あなたが投稿したものとほぼ同じエラーが発生します:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

問題は、poolすべてのメソッドが a を使用しmp.SimpleQueueてタスクをワーカー プロセスに渡すことです。を通過するものはすべてmp.SimpleQueue選択可能でなければならfoo.workず、モジュールの最上位で定義されていないため、選択可能ではありません。

トップレベルで以下を呼び出す関数を定義することで修正できますfoo.work()

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

は最上位レベルで定義されており foo、ピック可能であるため、 がピック可能であることに注意してください。Foofoo.__dict__

于 2012-01-10T14:54:13.477 に答える
122

pathos.multiprocesssingの代わりに を使用しmultiprocessingます。 を使用pathos.multiprocessingする のフォークです。Python ではほとんど何でもシリアル化できるため、より多くのものを並行して送信できます。フォークには、クラス メソッドに必要な複数の引数関数を直接操作する機能もあります。multiprocessingdilldillpathos

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

こちらから入手してくださいpathos(必要に応じてdill): https://github.com/uqfoundation

于 2014-01-25T01:34:08.600 に答える
39

この問題が発生したときmultiprocessingの簡単な解決策は、 から に切り替えるPoolことThreadPoolです。これは、インポート以外のコードを変更することなく実行できます。

from multiprocessing.pool import ThreadPool as Pool

これは、新しいプロセスを作成するのではなく、ThreadPool がメイン スレッドとメモリを共有するために機能します。これは、ピクルが必要ないことを意味します。

この方法の欠点は、Python がスレッドを処理するのに最適な言語ではないことです。Python はグローバル インタープリター ロックと呼ばれるものを使用してスレッド セーフを維持しているため、一部のユース ケースでは速度が低下する可能性があります。ただし、主に他のシステムと対話している場合 (HTTP コマンドの実行、データベースとの対話、ファイルシステムへの書き込み) は、コードが CPU に拘束されていない可能性が高く、大きな打撃を受けることはありません。実際、HTTP/HTTPS ベンチマークを書いているときに、ここで使用されているスレッド モデルのオーバーヘッドと遅延が少ないことがわかりました。これは、新しいプロセスを作成するオーバーヘッドが、新しいスレッドを作成するオーバーヘッドよりもはるかに高く、それ以外の場合、プログラムは単に HTTP を待機していたためです。反応。

したがって、Python ユーザー空間で大量のものを処理している場合、これは最善の方法ではない可能性があります。

于 2019-11-17T03:34:32.370 に答える
35

他の人が言ったようmultiprocessingに、Python オブジェクトはピクルできるワーカー プロセスにしか転送できません。unutbu で説明されているようにコードを再編成できない場合は、dill以下に示すように、データ (特にコード データ) を転送するための拡張ピクル/アンピクル機能を使用できます。

このソリューションでは、次のようなライブラリのインストールのみが必要でdillあり、他のライブラリは必要ありませんpathos

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
于 2014-07-10T09:56:14.773 に答える
25

プロファイラーを使用することで、完全に機能するコードでそのエラー出力を正確に生成できることもわかりました。

これはWindows上にあったことに注意してください(フォークは少しエレガントではありません)。

私は走っていた:

python -m profile -o output.pstats <script> 

そして、プロファイリングを削除するとエラーが削除され、プロファイリングを配置するとエラーが復元されることがわかりました。私は以前のコードが機能することを知っていたので、私もバタバタしていました。何かがpool.pyを更新したかどうかを確認していました...その後、沈み込み感があり、プロファイリングを削除しました。それだけでした。

他の誰かがそれに遭遇した場合に備えて、アーカイブのためにここに投稿してください。

于 2012-10-31T04:25:05.103 に答える
4

このソリューションでは、dil のインストールのみが必要であり、pathos として他のライブラリは必要ありません

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

numpy 配列でも機能します。

于 2015-09-27T12:13:20.830 に答える