6

大きな ctype 配列をセグメント化し、それらを並行して処理しています。以下のエラーが表示されましたが、配列のあるセグメントが別のセグメントより先に処理を終了しているためだと思います。process.join() を使用してプロセスの最初のセットを待機させようとしましたが、うまくいきません。アイデア?

Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored

使用:

    ....

        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is now stored in a shared array destroy the array ref for memory reasons

            step = y // cores
            if step != 0:
                jobs =[]
                for i in range (0, y, step):
                    process = p.Process(target=stretch, args= (shared_arr,slice(i, i+step)),kwargs=options)
                    jobs.append(process)
                    process.start()

                for j in jobs:
                    j.join()

    del jobs
    del process

アップデート:

 #Create an ctypes array
        array = ArrayConvert.SharedMemArray(array)
        #Create a global of options
        init_options(options) #options is a dict
        with closing(multiprocessing.Pool(initializer=init(array))) as p:
            del array #Since the array is not stored in a shared array destroy the array ref for memory reasons


            step = y // cores
            if step != 0:
                for i in range (0, y, step):
                    #Package all the options into a global dictionary

                    p.map_async(stretch,[slice(i, i+step)])

                    #p.apply_async(stretch,args=(shared_arr,slice(i, i+step)),kwargs=options)

        p.join()        

def init_options(options_):
    global kwoptions
    kwoptions = options_

map_async に渡す関数は別のモジュールに格納されているため、その関数にグローバル kwoptions を渡すのに苦労しています。このようにモジュール間でグローバルを渡すのは正しくないようです (unpythonic)。これは、map_async を介して kwargs を渡すことができる方法ですか。

別のもの (適用またはプロセス) を使用してマルチプロセッシングを作り直す必要がありますか?

4

2 に答える 2

2

そこで、コードを作り直してプールを削除することで、これを機能させました(JF Sebastianのコメントによる)。

擬似コード:

initialize the shared array
determine step size
create an empty list of jobs
create the process, pass it the kwargs, and append it to the job list
start the jobs
join the jobs

それがグーグルユーザーに役立つ場合のコードは次のとおりです。

#Initialize the ctypes array
        init(array)
        #Remove the reference to the array (to preserve memory on multiple iterations.
        del array

        step = y // cores
        jobs = []
        if step != 0:
            for i in range(0,y,step):        
                p = multiprocessing.Process(target=stretch,args= (shared_arr,slice(i, i+step)),kwargs=options)
                jobs.append(p)

            for job in jobs:
                job.start()
            for job in jobs:
                job.join()
于 2012-08-08T18:14:31.873 に答える
1

initializerの引数Pool()は関数を受け入れます。initializer=init(array)と置き換えますinitializer=init, initargs=(array,)

f()ファミリで使用される関数にキーワード引数を渡すにpool.*map*は、ラッパーを作成できますmp_f()

#!/usr/bin/env python
import logging
import multiprocessing as mp
from contextlib import closing

def init(shared_array_):
    # globals that should be available in worker processes should be
    # initialized here
    global shared_array
    shared_array = shared_array_

def f(interval, a=None, b=None):
    mp.get_logger().info("interval=%r, a=%r, b=%r" % (interval, a, b))
    shared_array[interval] = [a + interval.start]*b # fake computations

def mp_f(arg_kwargs):
    try:
        arg, kwargs = arg_kwargs
        return f(arg, **kwargs) # pass keyword args to f()
    except Exception:
        mp.get_logger().error("f%r failed" % (arg_kwargs,))

def main():
    mp.log_to_stderr().setLevel(logging.INFO)

    N = 10**6
    array = mp.RawArray('i', N) # create shared array

    # create workers pool; use all available CPU cores
    with closing(mp.Pool(initializer=init, initargs=(array,))) as p:
        options = dict(a=5, b=N//4) # dummy options
        step = options['b']
        args = ((slice(i, i+step), options) for i in range(0, N, step))
        for _ in p.imap_unordered(mp_f, args): # submit jobs
            pass
    p.join()
    mp.get_logger().info(array[::step])

if __name__=="__main__":
    mp.freeze_support() # for py2exe and the-like on Windows
    main()
于 2012-08-09T10:28:27.683 に答える