複雑な dtype の配列から始めることができることに注意してください。
In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')
同種のdtypeの配列として表示します。
In [5]: data2 = data.view('float32')
その後、複雑な dtype に変換します。
In [7]: data3 = data2.view('float32, (250000,2)float32')
dtype の変更は非常に迅速な操作です。基になるデータには影響せず、NumPy が解釈する方法のみに影響します。したがって、dtype の変更は事実上コストがかかりません。
したがって、単純な (同種の) dtype を持つ配列について読んだことは、上記のトリックを使用して複雑な dtype に簡単に適用できます。
以下のコードは、 JF Sebastian の回答 hereから多くのアイデアを借りています。
import numpy as np
import multiprocessing as mp
import contextlib
import ctypes
import struct
import base64
def decode(arg):
chunk, counter = arg
print len(chunk), counter
for x in chunk:
peak_counter = 0
data_buff = base64.b64decode(x)
buff_size = len(data_buff) / 4
unpack_format = ">%dL" % buff_size
index = 0
for y in struct.unpack(unpack_format, data_buff):
buff1 = struct.pack("I", y)
buff2 = struct.unpack("f", buff1)[0]
with shared_arr.get_lock():
data = tonumpyarray(shared_arr).view(
[('f0', '<f4'), ('f1', '<f4', (250000, 2))])
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
peak_counter += 1
index += 1
counter += 1
def pool_init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.
The m/z values are assumed to be ordered without validating this
assumption.
Note: This function uses multi-processing
"""
processors = mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr, ))) as pool:
chunk_size = int(len(peaks) / processors)
map_parameters = []
for i in range(processors):
counter = i * chunk_size
# WARNING: I removed -1 from (i + 1)*chunk_size, since the right
# index is non-inclusive.
chunk = peaks[i*chunk_size : (i + 1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode, map_parameters)
if __name__ == '__main__':
shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)
peaks = ...
numpy_array(shared_arr, peaks)
割り当てを実行するさまざまなプロセスが保証できる場合
if (index % 2 == 0):
data[counter][1][peak_counter][0] = float(buff2)
else:
data[counter][1][peak_counter][1] = float(buff2)
同じ場所でデータを変更するために競合することは決してありません。そうすれば、ロックの使用を実際に忘れることができると思います
with shared_arr.get_lock():
しかし、私はあなたのコードを十分に理解していないので、安全のためにロックを含めました。