2

次のように、以前は単一の関数で処理していたバイナリ エンコードされた文字列を含む大きなリストがあります。

""" just included this to demonstrate the 'data' structure """
data=np.zeros(250,dtype='float32, (250000,2)float32')

def func numpy_array(data, peaks):
rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1

私はマルチプロセッシングについて読んでいて、パフォーマンスが大幅に向上するかどうかを確認するためにそれを試してみたいと考えていました。次のように関数を 2 (ヘルパーと「呼び出し元」) に書き直しました。

def numpy_array(data, peaks):
    processors=mp.cpu_count #Might as well throw this directly in the mp.Pool (just for clarity for now)
    pool = mp.Pool(processes=processors)
    chunk_size=len(peaks)/processors
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        pool.map(decode(data,chunk,counter))

def decode(data,chunk,counter):
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1      

プログラムは実行されますが、CPU の 100 ~ 110% しか使用されず (トップによると)、プログラムが終了すると、プログラムがスローTypeError: map() takes at least 3 arguments (2 given)されます。 TypeError を引き起こす可能性があります)?CPU 使用率が低い原因は何ですか?

-- 回答を組み込んだコード --

def decode((data,chunk,counter)):
    print len(chunk), counter
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1

def numpy_array(data, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors=mp.cpu_count()
    pool = mp.Pool(processes=processors)
    chunk_size=int(len(peaks)/processors)
    map_parameters=[]
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter))
    pool.map(decode,map_parameters) 

この最新バージョンは、プロセス内の配列 (配列に値が含まれる場所) を埋めるまでは「機能」しますが、すべてのプロセスが完了すると、各プロセスが配列のローカル コピーを取得するため、配列にアクセスすると値がゼロになります。

4

2 に答える 2

2

このようなものが動作するはずです

pool.map呼び出しごとに、関数とその関数のパラメーターのリストを受け取ることに注意してください。numpy_array元の例では、関数でそれを呼び出しているだけです。

関数は引数を 1 つだけ持つ必要があるため、引数をタプルにパッキングし、奇妙に見える二重括弧を入れますdecode(タプルのアンパッキングと呼ばれます)。

def numpy_array(data, peaks):
    processors=4
    pool = mp.Pool(processes=processors)
    chunk_size=len(data)/processors
    print range(processors)
    map_parameters = [] # new
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter)) # new
    pool.map(decode, map_parameters) # new

def decode((data,chunk,counter)): # changed
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1
于 2013-04-12T08:14:45.803 に答える
1

バグはあなたのnumpy_array機能にあります:

for i in range(processors):
    counter = i*chunk_size
    chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
    pool.map(decode(data,chunk,counter))

問題は、順番に呼び出しているmapため、一度に 1 つのプロセスしか実行していないことです。また、map署名pool.map(f(*args))map(f, ['list', 'of', 'data']).

data配列が非常に大きいか、将来的に大きくなる可能性があると想定しているため、コピーを作成しないようにパーシャルを使用します。

これは次のようになります。

import functools
decode_with_data = functools.partial(decode, data)
args = []
for i in range(processors):
    counter = i * chunk_size
    chunk = peaks[1*chunk_size:(i+1)*chunk_size-1]
    args.append(chunk, counter)
pool.map(decode_with_data, args)
于 2013-04-12T08:15:10.400 に答える