12

配列の共有に関するSOの質問をかなり読んだことがありますが、単純な配列には十分に単純に思えますが、私が持っている配列でそれを機能させようとして立ち往生しています。

import numpy as np
data=np.zeros(250,dtype='float32, (250000,2)float32')

mp.Array何らかの方法で を受け入れようとすることで、これを共有配列に変換しようとしdataました。また、ctypesをそのまま使用して配列を作成しようとしました:

import multiprocessing as mp
data=mp.Array('c_float, (250000)c_float',250)

コードを機能させる唯一の方法は、データを関数に渡すのではなく、エンコードされた文字列を渡して圧縮解除/デコードすることですが、これは n (文字列の数) プロセスが呼び出されることになり、冗長に見えます。私の望ましい実装は、バイナリ文字列のリストを x (プロセスの数) にスライスし、このチャンクを渡すことにdata基づいていindexますカスタム(ネストされた)numpy配列を使用すると、すでに大きな助けになります。data

PS: この質問は、 Python マルチプロセッシングからのフォローアップです。

4

2 に答える 2

11

複雑な dtype の配列から始めることができることに注意してください。

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

同種のdtypeの配列として表示します。

In [5]: data2 = data.view('float32')

その後、複雑な dtype に変換します。

In [7]: data3 = data2.view('float32, (250000,2)float32')

dtype の変更は非常に迅速な操作です。基になるデータには影響せず、NumPy が解釈する方法のみに影響します。したがって、dtype の変更は事実上コストがかかりません。

したがって、単純な (同種の) dtype を持つ配列について読んだことは、上記のトリックを使用して複雑な dtype に簡単に適用できます。


以下のコードは、 JF Sebastian の回答 hereから多くのアイデアを借りています。

import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64


def decode(arg):
    chunk, counter = arg
    print len(chunk), counter
    for x in chunk:
        peak_counter = 0
        data_buff = base64.b64decode(x)
        buff_size = len(data_buff) / 4
        unpack_format = ">%dL" % buff_size
        index = 0
        for y in struct.unpack(unpack_format, data_buff):
            buff1 = struct.pack("I", y)
            buff2 = struct.unpack("f", buff1)[0]
            with shared_arr.get_lock():
                data = tonumpyarray(shared_arr).view(
                    [('f0', '<f4'), ('f1', '<f4', (250000, 2))])
                if (index % 2 == 0):
                    data[counter][1][peak_counter][0] = float(buff2)
                else:
                    data[counter][1][peak_counter][1] = float(buff2)
                    peak_counter += 1
            index += 1
        counter += 1


def pool_init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_  # must be inherited, not passed as an argument


def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())


def numpy_array(shared_arr, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors = mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size = int(len(peaks) / processors)
        map_parameters = []
        for i in range(processors):
            counter = i * chunk_size
            # WARNING: I removed -1 from (i + 1)*chunk_size, since the right
            # index is non-inclusive. 
            chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode, map_parameters)

if __name__ == '__main__':
    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
    peaks = ...
    numpy_array(shared_arr, peaks)

割り当てを実行するさまざまなプロセスが保証できる場合

if (index % 2 == 0):
    data[counter][1][peak_counter][0] = float(buff2)
else:
    data[counter][1][peak_counter][1] = float(buff2)

同じ場所でデータを変更するために競合することは決してありません。そうすれば、ロックの使用を実際に忘れることができると思います

with shared_arr.get_lock():

しかし、私はあなたのコードを十分に理解していないので、安全のためにロックを含めました。

于 2013-04-12T17:59:16.660 に答える
0
from multiprocessing import Process, Array
import numpy as np
import time
import ctypes

def fun(a):
    a[0] = -a[0]
    while 1:
        time.sleep(2)
        #print bytearray(a.get_obj())
        c=np.frombuffer(a.get_obj(),dtype=np.float32)
        c.shape=3,3
        print 'haha',c


def main():
    a = np.random.rand(3,3).astype(np.float32)
    a.shape=1*a.size
    #a=np.array([[1,3,4],[4,5,6]])
    #b=bytearray(a)
    h=Array(ctypes.c_float,a)
    print "Originally,",h

    # Create, start, and finish the child process
    p = Process(target=fun, args=(h,))
    p.start()
    #p.join()
    a.shape=3,3
    # Print out the changed values
    print 'first',a
    time.sleep(3)
    #h[0]=h[0]+1
    print 'main',np.frombuffer(h.get_obj(), dtype=np.float32)



if __name__=="__main__":
    main()
于 2015-04-20T09:42:11.693 に答える