24

multiprocessing.Poolオブジェクトを使用していて、コンストラクターの設定initializerを使用して、グローバル名前空間にリソースを作成する初期化関数を渡しているとします。リソースにコンテキスト マネージャーがあるとします。コンテキスト管理されたリソースがプロセスの存続期間中存続する必要があるが、最後に適切にクリーンアップされる場合、コンテキスト管理リソースのライフサイクルをどのように処理しますか?

これまでのところ、次のようなものがあります。

resource_cm = None
resource = None


def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

これ以降、プール プロセスはリソースを使用できます。ここまでは順調ですね。ただし、このクラスはor引数multiprocessing.Poolを提供しないため、クリーンアップの処理は少しトリッキーです。destructordeinitializer

私のアイデアの 1 つは、atexitモジュールを使用し、初期化子にクリーンアップを登録することです。このようなもの:

def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

    def _clean_up():
        resource_cm.__exit__()

    import atexit
    atexit.register(_clean_up)

これは良いアプローチですか?これを行う簡単な方法はありますか?

EDIT:atexit動作していないようです。少なくとも私が上記で使用している方法ではないので、現時点ではまだこの問題の解決策がありません.

4

3 に答える 3

38

まず、これは本当に素晴らしい質問です!コードを少し掘り下げた後、multiprocessingこれを行う方法を見つけたと思います:

を開始すると、オブジェクトはmultiprocessing.Pool内部的にプールのメンバーごとにPoolオブジェクトを作成します。multiprocessing.Processこれらのサブプロセスが起動すると、_bootstrap次のような関数が呼び出されます。

def _bootstrap(self):
    from . import util
    global _current_process
    try:
        # ... (stuff we don't care about)
        util._finalizer_registry.clear()
        util._run_after_forkers()
        util.info('child process calling self.run()')
        try:
            self.run()
            exitcode = 0 
        finally:
            util._exit_function()
        # ... (more stuff we don't care about)

メソッドは、指定したオブジェクトをrun実際に実行するものです。内部キューを介して作業項目が到着するのを待機する、長時間実行される while ループを持つメソッドであるプロセスの場合。私たちにとって本当に興味深いのは、 :が呼び出された後に何が起こったかです。targetProcessPool self.runutil._exit_function()

結局のところ、その関数は、探しているものと非常によく似たクリーンアップを行います。

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
                   active_children=active_children,
                   current_process=current_process):
    # NB: we hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module's globals are destroyed.

    global _exiting

    info('process shutting down')
    debug('running all "atexit" finalizers with priority >= 0')  # Very interesting!
    _run_finalizers(0)

のドキュメント文字列は次の_run_finalizersとおりです。

def _run_finalizers(minpriority=None):
    '''
    Run all finalizers whose exit priority is not None and at least minpriority

    Finalizers with highest priority are called first; finalizers with
    the same priority will be called in reverse order of creation.
    '''

このメソッドは、実際にはファイナライザー コールバックのリストを介して実行し、それらを実行します。

items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True)

for key, finalizer in items:
    sub_debug('calling %s', finalizer)
    try:
        finalizer()
    except Exception:
        import traceback
        traceback.print_exc()

完全。では、どうやって に入るの_finalizer_registryですか? レジストリにコールバックを追加する責任がある、ドキュメント化されていないオブジェクトが呼び出さFinalizeれています。multiprocessing.util

class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        assert exitpriority is None or type(exitpriority) is int

        if obj is not None:
            self._weakref = weakref.ref(obj, self)
        else:
            assert exitpriority is not None

        self._callback = callback
        self._args = args
        self._kwargs = kwargs or {}
        self._key = (exitpriority, _finalizer_counter.next())
        self._pid = os.getpid()

        _finalizer_registry[self._key] = self  # That's what we're looking for!

それでは、すべてを例にまとめます。

import multiprocessing
from multiprocessing.util import Finalize

resource_cm = None
resource = None

class Resource(object):
    def __init__(self, args):
        self.args = args

    def __enter__(self):
        print("in __enter__ of %s" % multiprocessing.current_process())
        return self

    def __exit__(self, *args, **kwargs):
        print("in __exit__ of %s" % multiprocessing.current_process())

def open_resource(args):
    return Resource(args)

def _worker_init(args):
    global resource
    print("calling init")
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()
    # Register a finalizer
    Finalize(resource, resource.__exit__, exitpriority=16)

def hi(*args):
    print("we're in the worker")

if __name__ == "__main__":
    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))
    pool.map(hi, range(pool._processes))
    pool.close()
    pool.join()

出力:

calling init
in __enter__ of <Process(PoolWorker-1, started daemon)>
calling init
calling init
in __enter__ of <Process(PoolWorker-2, started daemon)>
in __enter__ of <Process(PoolWorker-3, started daemon)>
calling init
in __enter__ of <Process(PoolWorker-4, started daemon)>
we're in the worker
we're in the worker
we're in the worker
we're in the worker
in __exit__ of <Process(PoolWorker-1, started daemon)>
in __exit__ of <Process(PoolWorker-2, started daemon)>
in __exit__ of <Process(PoolWorker-3, started daemon)>
in __exit__ of <Process(PoolWorker-4, started daemon)>

ご覧のとおり、プール__exit__時にすべてのワーカーで呼び出されjoin()ます。

于 2014-07-13T15:48:30.933 に答える