25

マルチプロセッシングのプロセスとキューを使用しています。いくつかの関数を並行して開始し、ほとんどの関数は適切に動作します。終了し、出力がキューに送られ、.is_alive() == False として表示されます。しかし、何らかの理由で、いくつかの機能が動作していません。関数の最後の行 (「Finished」という print ステートメント) が完了した後でも、常に .is_alive() == True と表示されます。これは、起動する関数のセットに関係なく、1 つしかない場合でも発生します。並行して実行しない場合、関数は正常に動作し、正常に戻ります。どのようなことが問題になる可能性がありますか?

ジョブを管理するために使用している一般的な関数を次に示します。私が示していないのは、私がそれに渡している関数だけです。それらは長く、多くの場合matplotlibを使用し、いくつかのシェルコマンドを起動することもありますが、失敗したものの共通点がわかりません.

def  runFunctionsInParallel(listOf_FuncAndArgLists):
    """
    Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.   
    """
    from multiprocessing import Process, Queue

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
        print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
        que.put(fff(*theArgs)) #we're putting return value into queue
        print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
        # We get this far even for "bad" functions
        return

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
    for job in jobs: job.start() # Launch them all
    import time
    from math import sqrt
    n=1
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
        n+=1
        time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
        print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
        print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
        print('---------------------------------------------------\n')
    # I never get to the following line when one of the "bad" functions is running.
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
    # And now, collect all the outputs:
    return([queue.get() for queue in queues])
4

1 に答える 1

18

わかりました、関数の出力が大きすぎると、キューを満たすために使用されるパイプが詰まるようです (私の大雑把な理解ですか? これは未解決/クローズされたバグですか? http://bugs.python.org/issue8237 )。質問のコードを変更して、バッファリング (プロセスの実行中にキューが定期的に空になる) ができるようにしました。これにより、すべての問題が解決されます。したがって、これはタスク (関数とその引数) のコレクションを受け取り、それらを起動し、出力を収集します。見た目がもっとシンプル/すっきりしていたらいいのにと思います。

編集 (2014 年 9 月; 2017 年 11 月の更新: 読みやすくするために書き直されました): それ以降に行った機能強化でコードを更新しています。新しいコード (同じ機能ですが、より優れた機能) はこちらです: https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py

呼び出しの説明も下にあります。

def runFunctionsInParallel(*args, **kwargs):
    """ This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments.
    """
    return cRunFunctionsInParallel(*args, **kwargs).launch_jobs()

###########################################################################################
###
class cRunFunctionsInParallel():
    ###
    #######################################################################################
    """Run any list of functions, each with any arguments and keyword-arguments, in parallel.
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied.
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name.
Parameters
----------
listOf_FuncAndArgLists : a list of lists 
    List of up-to-three-element-lists, like [function, args, kwargs],
    specifying the set of functions to be launched in parallel.  If an
    element is just a function, rather than a list, then it is assumed
    to have no arguments or keyword arguments. Thus, possible formats
    for elements of the outer list are:
      function
      [function, list]
      [function, list, dict]
kwargs: dict
    One can also supply the kwargs once, for all jobs (or for those
    without their own non-empty kwargs specified in the list)
names: an optional list of names to identify the processes.
    If omitted, the function name is used, so if all the functions are
    the same (ie merely with different arguments), then they would be
    named indistinguishably
offsetsSeconds: int or list of ints
    delay some functions' start times
expectNonzeroExit: True/False
    Normal behaviour is to not proceed if any function exits with a
    failed exit code. This can be used to override this behaviour.
parallel: True/False
    Whenever the list of functions is longer than one, functions will
    be run in parallel unless this parameter is passed as False
maxAtOnce: int
    If nonzero, this limits how many jobs will be allowed to run at
    once.  By default, this is set according to how many processors
    the hardware has available.
showFinished : int
    Specifies the maximum number of successfully finished jobs to show
    in the text interface (before the last report, which should always
    show them all).
Returns
-------
Returns a tuple of (return codes, return values), each a list in order of the jobs provided.
Issues
-------
Only tested on POSIX OSes.
Examples
--------
See the testParallel() method in this module
    """
于 2012-08-07T22:46:56.343 に答える