41

マルチプロセッシングは python の強力なツールであり、もっと深く理解したいと思っています。通常の ロックキューを使用するタイミングと、マルチプロセッシングマネージャーを使用してこれらをすべてのプロセス間で共有するタイミングを知りたいです。

マルチプロセッシングの 4 つの異なる条件を使用して、次のテスト シナリオを考え出しました。

  1. プールとNO Managerの使用

  2. プールとマネージャーの使用

  3. 個々のプロセスとNO Managerの使用

  4. 個々のプロセスとマネージャーの使用

仕事

すべての条件がジョブ機能を実行しますthe_jobthe_jobロックによって保護されたいくつかの印刷で構成されています。さらに、関数への入力は単純にキューに入れられます (キューから復元できるかどうかを確認するため)。この入力は、(一番下に表示されている)と呼ばれるメイン スクリプトで作成された単純なインデックスidxです。range(10)start_scenario

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)

条件の成功は、キューからの入力を完全にリコールすることとして定義されますread_queue。下部の関数を参照してください。

状況、契約条項

条件 1 と 2 は、かなり自明です。条件 1 では、ロックとキューを作成し、これらをプロセス プールに渡します。

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(ヘルパー関数make_iteratorは、この投稿の下部に記載されています。) 条件 1 は で失敗しRuntimeError: Lock objects should only be shared between processes through inheritanceます。

条件 2 はかなり似ていますが、ロックとキューはマネージャの監視下にあります。

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

条件 3 では、新しいプロセスが手動で開始され、マネージャーなしでロックとキューが作成されます。

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

条件 4 は似ていますが、ここでもマネージャーを使用しています。

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

3 と 4 の両方の条件で、10 個のタスクのそれぞれに対して新しいプロセスを開始the_jobし、最大でncoresプロセスが同時に動作します。これは、次のヘルパー関数で実現されます。

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)

結果

条件 1 のみが失敗し ( RuntimeError: Lock objects should only be shared between processes through inheritance)、他の 3 つの条件は成功します。私はこの結果に頭を悩ませようとします。

プールはすべてのプロセス間でロックとキューを共有する必要があるのに、条件 3 の個々のプロセスは共有しないのはなぜですか?

私が知っているのは、プール条件 (1 と 2) では、イテレーターからのすべてのデータが酸洗を介して渡されるのに対し、単一プロセス条件 (3 と 4) では、イテレーターからのすべてのデータがメイン プロセスからの継承によって渡されることです (私はLinuxを使用)。子プロセス内からメモリが変更されるまで、親プロセスが使用するのと同じメモリがアクセスされると思います(コピーオンライト)。しかしlock.acquire()、これは変更する必要があり、子プロセスはメモリ内の別の場所に配置された別のロックを使用しますよね? 1 つの子プロセスは、マネージャーを介して共有されていない兄弟がロックをアクティブにしたことをどのように知るのでしょうか?

最後に、条件 3 と 4 がどれだけ異なるかという私の質問に多少関連があります。どちらも個別のプロセスを持っていますが、マネージャーの使用法が異なります。どちらも有効なコードと見なされますか? それとも、実際にマネージャーが必要ない場合は、マネージャーの使用を避けるべきですか?


完全なスクリプト

すべてをコピーして貼り付けてコードを実行したい人のために、完全なスクリプトを次に示します。

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()
4

1 に答える 1

35

multiprocessing.LockOS が提供する Semaphore オブジェクトを使用して実装されます。Linux では、子は を介し​​て親からセマフォへのハンドルを継承するだけos.forkです。これはセマフォのコピーではありません。実際には、ファイル記述子を継承できるのと同じ方法で、親が持っているのと同じハンドルを継承しています。一方、Windows は をサポートしていないためos.forkLock. これは、Windows APImultiprocessing.Lockを使用して、オブジェクトによって内部的に使用される Windows セマフォへの複製ハンドルを作成することによって行われます。DuplicateHandle

複製ハンドルは、元のハンドルと同じオブジェクトを参照します。したがって、オブジェクトへの変更は両方のハンドルを介して反映されます。

API を使用すると、複製されたハンドルのDuplicateHandle所有権を子プロセスに与えることができるため、子プロセスは、それを unpickle した後に実際に使用できます。子が所有する重複したハンドルを作成することにより、ロック オブジェクトを効果的に「共有」できます。

これがセマフォオブジェクトですmultiprocessing/synchronize.py

class SemLock(object):

    def __init__(self, kind, value, maxvalue):
        sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
        debug('created semlock with handle %s' % sl.handle)
        self._make_methods()

        if sys.platform != 'win32':
            def _after_fork(obj):
                obj._semlock._after_fork()
            register_after_fork(self, _after_fork)

    def _make_methods(self):
        self.acquire = self._semlock.acquire
        self.release = self._semlock.release
        self.__enter__ = self._semlock.__enter__
        self.__exit__ = self._semlock.__exit__

    def __getstate__(self):  # This is called when you try to pickle the `Lock`.
        assert_spawning(self)
        sl = self._semlock
        return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)

    def __setstate__(self, state): # This is called when unpickling a `Lock`
        self._semlock = _multiprocessing.SemLock._rebuild(*state)
        debug('recreated blocker with handle %r' % state[0])
        self._make_methods()

オブジェクトをピクルするときに呼び出されるのassert_spawning呼び出しに注意してください。__getstate__実装方法は次のとおりです。

#
# Check that the current thread is spawning a child process
#

def assert_spawning(self):
    if not Popen.thread_is_spawning():
        raise RuntimeError(
            '%s objects should only be shared between processes'
            ' through inheritance' % type(self).__name__
            )

その関数は、Lockを呼び出して を「継承」していることを確認する関数ですthread_is_spawning。Linux では、このメソッドは以下を返しますFalse

@staticmethod
def thread_is_spawning():
    return False

これは、Linux が を継承するためにピクルする必要がないためです。したがって、実際に Linux で が呼び出されているLock場合は、継承していてはなりません。__getstate__Windows では、さらに多くの処理が行われます。

def dump(obj, file, protocol=None):
    ForkingPickler(file, protocol).dump(obj)

class Popen(object):
    '''
    Start a subprocess to run the code of a process object
    '''
    _tls = thread._local()

    def __init__(self, process_obj):
        ...
        # send information to child
        prep_data = get_preparation_data(process_obj._name)
        to_child = os.fdopen(wfd, 'wb')
        Popen._tls.process_handle = int(hp)
        try:
            dump(prep_data, to_child, HIGHEST_PROTOCOL)
            dump(process_obj, to_child, HIGHEST_PROTOCOL)
        finally:
            del Popen._tls.process_handle
            to_child.close()


    @staticmethod
    def thread_is_spawning():
        return getattr(Popen._tls, 'process_handle', None) is not None

ここでは、オブジェクトに属性があるかどうかをthread_is_spawning返します。属性が で作成され、継承したいデータが を使用して親から子に渡され、属性が削除されることがわかります。期間中のみとなります。この python-ideas メーリング リスト スレッドによると、これは実際には Linuxと同じ動作をシミュレートするために追加された人為的な制限です。はいつでも実行できるため、 Windows は実際にはいつでもを渡すことができます。TruePopen._tlsprocess_handleprocess_handle__init__dumpthread_is_spawningTrue__init__os.forkLockDuplicateHandle

内部で使用するため、上記のすべてがQueueオブジェクトに適用されます。Lock

Lockオブジェクトの継承は を使用するよりも好ましいと言えManager.Lock()ます。 を使用する場合、Manager.Lockに対して行うすべての呼び出しLockを IPC 経由でプロセスに送信する必要があるためです。これは、呼び出し内にManagerある共有を使用するよりもはるかに遅くなります。Lock処理する。ただし、どちらのアプローチも完全に有効です。

最後に、 /キーワード引数を使用して、 aを使用せずLockに a のすべてのメンバーに aを渡すことができます。PoolManagerinitializerinitargs

lock = None
def initialize_lock(l):
   global lock
   lock = l

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    """
    lock = mp.Lock()
    mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
    queue = mp.Queue()

    iterator = make_iterator(args, queue)

    mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.

    mypool.close()
    mypool.join()

return read_queue(queue)

これが機能するのは、渡された引数が内で実行されるオブジェクトのメソッドにinitargs渡されるためです。__init__ProcessPool

于 2014-07-16T17:44:44.997 に答える