0

私が現在持っているのは、データをチャンクして処理するかなり単純な実装です。816MBのファイルで約34秒で動作しますが、それ以上速くしたいです。どのビットが最も時間がかかるかを確認するために既にプロファイリングしましたが、かなりの時間がかかるもののほとんどは、Python モジュール関数に集中しています。その結果、パフォーマンスを向上させるために何ができるかについて行き詰まっています。どんな助けでも大歓迎です。プロファイルと関連するコードを以下に含めました。

Sun May  5 02:10:28 2013    chunking.prof

         50868044 function calls in 42.901 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   358204   13.791    0.000   30.722    0.000 reader_variants.py:361(_unpack_from)
  7164080    7.331    0.000   10.812    0.000 reader_variants.py:116(_null_terminate)
 10029712    4.762    0.000    4.762    0.000 reader_variants.py:210(_missing_values_mod)
        1    4.117    4.117    4.117    4.117 {numpy.core.multiarray.array}
   716407    3.751    0.000    5.927    0.000 {map}
 17193696    2.176    0.000    2.176    0.000 reader_variants.py:358(<lambda>)
  7164080    1.906    0.000    1.906    0.000 {method 'lstrip' of 'str' objects}
  7164080    1.574    0.000    1.574    0.000 {method 'index' of 'str' objects}
   358204    1.204    0.000   37.672    0.000 reader_variants.py:353(parse_records)
   358204    1.135    0.000    1.135    0.000 {_struct.unpack_from}
        1    0.417    0.417   42.901   42.901 <string>:1(<module>)
      779    0.349    0.000   38.021    0.049 reader_variants.py:349(process_chunk)
      779    0.330    0.000    0.330    0.000 {method 'read' of 'file' objects}
   358983    0.041    0.000    0.041    0.000 {len}
        1    0.009    0.009   42.484   42.484 reader_variants.py:306(genfromdta_cc)
      779    0.006    0.000    0.006    0.000 {method 'extend' of 'list' objects}
       48    0.000    0.000    0.000    0.000 reader_variants.py:324(<lambda>)
        1    0.000    0.000    4.117    4.117 numeric.py:256(asarray)
        1    0.000    0.000    0.000    0.000 {method 'seek' of 'file' objects}
        1    0.000    0.000    0.000    0.000 {sum}
        1    0.000    0.000    0.000    0.000 {method 'join' of 'str' objects}
        1    0.000    0.000    0.000    0.000 {zip}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}




def genfromdta_cc(self, missing_flt=-999., encoding=None, pandas=False,
                    convert_dates=True, size=1024*1024): # default chunk size 1mb
        """
        reads stata data by chunking the file
        """
        try:
            self._file.seek(self._data_location)
        except Exception:
            pass

        nobs = self._header['nobs']
        varnames = self._header['varlist']
        typlist = self._header['typlist']
        types = self._header['dtyplist']

        dt = np.dtype(zip(varnames, types))
        data=[]

        fmt = ''.join(map(lambda x: str(x)+'s' if type(x) is int else x, typlist))
        record_size = sum(self._col_sizes)

        maxrecords = size/record_size # max number of records we can fit in size

        if maxrecords > nobs: # if the file is smaller than the ideal chunk size
            chunk_size = nobs*record_size # read the entire file in
        else:   
            chunk_size = maxrecords * record_size
            chunk_size_leftover = (nobs*record_size)%chunk_size

        numchunks = nobs / maxrecords # number of chunks
        numchunks_leftover = nobs % maxrecords #number of records left over

        for i in xrange(numchunks):
            chunk = self._file.read(chunk_size)
            data.extend(self.process_chunk(chunk, fmt, record_size, missing_flt))

        # last chunk contains less than max number of records
        if numchunks_leftover > 0:
            chunk = self._file.read(chunk_size_leftover)
            data.extend(self.process_chunk(chunk, fmt, record_size, missing_flt))

        return np.asarray(data, dtype=dt) # return data as numpy array

def process_chunk(self, chunk, fmt, record_size, missing_flt):      
        iternum = len(chunk)/record_size #number of records to read
        return [self.parse_records(chunk, fmt, record_size, missing_flt, i) for i in xrange(iternum)]

def parse_records(self, chunk, fmt, record_size, missing_flt, offset):
        # create record
        record = self._unpack_from(fmt, chunk, record_size*offset)
        # check to see if None in record
        if None in record:
            record = map(lambda x: missing_flt if x is None else x, record)
        return tuple(record)

def _unpack_from(self, fmt, byt, offset):
        typlist = self._header['typlist']
        d = map(None, unpack_from(self._header['byteorder']+fmt, byt, offset))
        d = [self._null_terminate(d[i], self._encoding) if type(typlist[i]) is int else self._missing_values_mod(d[i], typlist[i]) for i in xrange(len(d))]         
        return d
4

1 に答える 1

1

1 つの方法は、ファイル全体をメモリに読み込むことです。数 GB の RAM があると仮定すると (数年前の PC でも珍しくありません)、816 MB は RAM に収まるはずです。その場合、チャンクを廃止できます。

import struct

with open('datafile.bin', 'r') as df:
    rawdata = df.read()

fmt = '17s244s244s53sh203s68sh14sff203s192s192s192s192s192s22sffffff23s36sffffff12s11s23s21sdhhfdfdfdfdf'
recsz = struct.calcsize(fmt)

results = []
for offset in xrange(0, len(rawdata)/recsize):
    results.append(struct.unpack_from(rawdata, fmt, offset))

コードから、レコードのサイズが一定のように見えますか? したがって、ファイル全体をメモリに読み込みたくない場合でも、ファイルをレコード サイズの断片に分けて読み込むことができます。

import struct

fmt = '17s244s244s53sh203s68sh14sff203s192s192s192s192s192s22sffffff23s36sffffff12s11s23s21sdhhfdfdfdfdf'
recsz = struct.calcsize(fmt)

results = []
with open('datafile.bin', 'r') as df:
    s = df.read(recsz)
    results.append(struct.unpack(fmt, s))

このアプローチは、 を使用してすべてのコアに分散することもできますmultiprocessing.Pool.map()。したがって、n 個のコアがある場合 n 個のプロセスでレコードの読み取りと解凍を行うことができます。せいぜいこれにより、必要な時間を 1/ nに短縮できます。(実際には、レコードをピクルしてマスター プロセスに送り返す必要があるため、もう少し時間がかかります。)

(注: 使用しているフォーマット文字列など、読み取っているデータの種類を教えていただけると助かります。)

于 2013-05-05T08:24:27.487 に答える