3

PyTablesandを使用して、多数の HDF5 ファイル (「多数」は N > 1000 ファイルを意味します) を読み込もうとしていますmultiprocessing。基本的に、データを読み取って RAM に保存するクラスを作成します。シーケンシャル モードでは問題なく動作するので、並列化してパフォーマンスを向上させたいと考えています。

flatten()ファイルの読み取りを並列化するためにクラスに新しいメソッドを作成して、今のところダミーのアプローチを試みました。次の例は、私がやろうとしていることの単純化された例です。listf読み取るファイルの名前を含む文字列のリストであり、ファイルで読み取りnxたいny配列のサイズです。

import numpy as np
import multiprocessing as mp
import tables

class data:
    def __init__(self, listf, nx, ny, nproc=0):
        self.listinc = []
        for i in range(len(listf)):
             self.listinc.append((listf[i], nx, ny))

    def __del__(self):
        del self.listinc

    def get_dsets(self, tuple_inc):
        listf, nx, ny = tuple_inc
        x = np.zeros((nx, ny))
        f = tables.openFile(listf)
        x = np.transpose(f.root.x[:ny,:nx])
        f.close()
        return(x)

    def flatten(self):
        nproc = mp.cpu_count()*2

        def worker(tasks, results):
            for i, x in iter(tasks.get, 'STOP'):
                print i, x
                results.put(i, self.get_dsets(x))

        tasks   = mp.Queue()
        results = mp.Queue()
        manager = mp.Manager()
        lx      = manager.list()

        for i, out in enumerate(self.listinc):
            tasks.put((i, out))

        for i in range(nproc):
            mp.Process(target=worker, args=(tasks, results)).start()

        for i in range(len(self.listinc)):
            j, res = results.get()
            lx.append(res)

        for i in range(nproc):
            tasks.put('STOP')

さまざまなことを試しました (この単純な例で行ったようにmanager、データを取得するために a を使用するなど)、常にTypeError: an integer is required.

私は実際には共有配列を必要とせず (データを取得したいだけです)、データを取得した後、NumPy でそれを操作したいので、ctypes 配列は使用しません。

どんな考え、ヒント、または助けも大歓迎です!

編集:私が得る完全なエラーは次のとおりです:

Process Process-341:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/toto/test/rd_para.py", line 81, in worker
    results.put(i, self.get_dsets(x))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 101, in put
    if not self._sem.acquire(block, timeout):
TypeError: an integer is required
4

1 に答える 1

0

答えは実はとてもシンプルでした...

では、worker取得するのはタプルであるため、次のことはできません。

result.put(i, self.get_dsets(x))

しかし、代わりに私はしなければなりません:

result.put((i, self.get_dsets(x)))

その後、完全にうまく機能します。

于 2013-01-17T16:35:17.013 に答える