PyTables
andを使用して、多数の HDF5 ファイル (「多数」は N > 1000 ファイルを意味します) を読み込もうとしていますmultiprocessing
。基本的に、データを読み取って RAM に保存するクラスを作成します。シーケンシャル モードでは問題なく動作するので、並列化してパフォーマンスを向上させたいと考えています。
flatten()
ファイルの読み取りを並列化するためにクラスに新しいメソッドを作成して、今のところダミーのアプローチを試みました。次の例は、私がやろうとしていることの単純化された例です。listf
読み取るファイルの名前を含む文字列のリストであり、ファイルで読み取りnx
たいny
配列のサイズです。
import numpy as np
import multiprocessing as mp
import tables
class data:
def __init__(self, listf, nx, ny, nproc=0):
self.listinc = []
for i in range(len(listf)):
self.listinc.append((listf[i], nx, ny))
def __del__(self):
del self.listinc
def get_dsets(self, tuple_inc):
listf, nx, ny = tuple_inc
x = np.zeros((nx, ny))
f = tables.openFile(listf)
x = np.transpose(f.root.x[:ny,:nx])
f.close()
return(x)
def flatten(self):
nproc = mp.cpu_count()*2
def worker(tasks, results):
for i, x in iter(tasks.get, 'STOP'):
print i, x
results.put(i, self.get_dsets(x))
tasks = mp.Queue()
results = mp.Queue()
manager = mp.Manager()
lx = manager.list()
for i, out in enumerate(self.listinc):
tasks.put((i, out))
for i in range(nproc):
mp.Process(target=worker, args=(tasks, results)).start()
for i in range(len(self.listinc)):
j, res = results.get()
lx.append(res)
for i in range(nproc):
tasks.put('STOP')
さまざまなことを試しました (この単純な例で行ったようにmanager
、データを取得するために a を使用するなど)、常にTypeError: an integer is required
.
私は実際には共有配列を必要とせず (データを取得したいだけです)、データを取得した後、NumPy でそれを操作したいので、ctypes 配列は使用しません。
どんな考え、ヒント、または助けも大歓迎です!
編集:私が得る完全なエラーは次のとおりです:
Process Process-341:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/toto/test/rd_para.py", line 81, in worker
results.put(i, self.get_dsets(x))
File "/usr/lib/python2.7/multiprocessing/queues.py", line 101, in put
if not self._sem.acquire(block, timeout):
TypeError: an integer is required