3

いくつかの基本的な情報を含むヘッダーとともに、y センサーの x 個の int16 データ ポイントを含む非常に大きなバイナリ ファイルがあります。バイナリ ファイルは、最大 x サンプルまでの各サンプル時間の y 値として書き込まれ、次に別の一連の読み取り値などが書き込まれます。すべてのデータが必要numpy.fromfile()な場合は、これを使用しています。ただし、センサー データのサブセットまたは特定のセンサーのみが必要な場合は、現在、 、、およびをfor使用した恐ろしい二重ループがあり、それには永遠に時間がかかります。Pythonでこれをより速く行う別の方法はありますか? おそらく私はよく理解していませんか?それとも全体を使用してからサブサンプリングしますか?file.seek()file.read()struct.unpack()mmap()fromfile()

data = numpy.empty(num_pts, sensor_indices)
for i in range(num_pts):
    for j in range(sensor_indices):
        curr_file.seek(bin_offsets[j])
        data_binary = curr_file.read(2)
        data[j][i] = struct.unpack('h', data_binary)[0]

@rrauenza on からのアドバイスに従いましたmmap。これは素晴らしい情報でした。コードを次のように編集しました。

mm = mmap.mmap(curr_file.fileno(), 0, access=mmap.ACCESS_READ)
data = numpy.empty(num_pts,sensor_indices)
for i in range(num_pts):
    for j in range(len(sensor_indices)):
        offset += bin_offsets[j] * 2
        data[j][i] = struct.unpack('h', mm[offset:offset+2])[0]

これは以前よりも高速ですが、それでも桁違いに遅いです

shape = (x, y)
data = np.fromfile(file=self.curr_file, dtype=np.int16).reshape(shape)
data = data.transpose()
data = data[sensor_indices, :]
data = data[:, range(num_pts)]

これを、30 秒のデータを含む 16 個のセンサーのみである 30 Mb の小さなファイルでテストしました。元のコードは 160 秒、mmap105 秒、np.fromfileサブサンプリングは 0.33 秒でした。

残りの質問は -numpy.fromfile()小さいファイルを使用する方が明らかに優れていますが、数時間または数日のデータと最大 500 個のセンサーで最大 20 Gb になる可能性のあるはるかに大きなファイルでは問題が発生しますか?

4

1 に答える 1

6

私は間違いなく試してみmmap()ます:

https://docs.python.org/2/library/mmap.html

呼び出している場合、および抽出するたびに、多くのシステムコールのオーバーヘッドがある小さなビットをたくさん読んでいます。seek()read()int16

私は実証するために小さなテストを書きました:

#!/usr/bin/python

import mmap
import os
import struct
import sys

FILE = "/opt/tmp/random"  # dd if=/dev/random of=/tmp/random bs=1024k count=1024
SIZE = os.stat(FILE).st_size
BYTES = 2
SKIP = 10


def byfile():
    sum = 0
    with open(FILE, "r") as fd:
        for offset in range(0, SIZE/BYTES, SKIP*BYTES):
            fd.seek(offset)
            data = fd.read(BYTES)
            sum += struct.unpack('h', data)[0]
    return sum


def bymmap():
    sum = 0
    with open(FILE, "r") as fd:
        mm = mmap.mmap(fd.fileno(), 0, prot=mmap.PROT_READ)
        for offset in range(0, SIZE/BYTES, SKIP*BYTES):
            data = mm[offset:offset+BYTES]
            sum += struct.unpack('h', data)[0]
    return sum


if sys.argv[1] == 'mmap':
    print bymmap()

if sys.argv[1] == 'file':
    print byfile()

キャッシングを補うために、各メソッドを 2 回実行しました。time時間を測りたいので利用usersysました。

結果は次のとおりです。

[centos7:/tmp]$ time ./test.py file
-211990391

real    0m44.656s
user    0m35.978s
sys     0m8.697s
[centos7:/tmp]$ time ./test.py file
-211990391

real    0m43.091s
user    0m37.571s
sys     0m5.539s
[centos7:/tmp]$ time ./test.py mmap
-211990391

real    0m16.712s
user    0m15.495s
sys     0m1.227s
[centos7:/tmp]$ time ./test.py mmap
-211990391

real    0m16.942s
user    0m15.846s
sys     0m1.104s
[centos7:/tmp]$ 

(合計 -211990391 は、両方のバージョンが同じことを行うことを検証するだけです。)

各バージョンの 2 番目の結果を見るmmap()と、リアルタイムの約 1/3 です。ユーザー時間は ~1/2 で、システム時間は ~1/5 です。

おそらくこれを高速化するための他のオプションは次のとおりです。

(1) ご指摘のとおり、ファイル全体をロードします。小さな I/O の代わりに大きな I/O を使用すると、速度が向上する可能性があります。ただし、システム メモリを超えた場合は、ページングにフォールバックしますが、これはmmap()(ページ アウトする必要があるため) よりも悪化します。mmapすでにより大きな I/O を使用しているため、ここではあまり期待できません。

(2) 同時実行。 複数のスレッドを介して並行してファイルを読み取ると、速度が向上する可能性がありますが、Python GILを処理する必要があります。 マルチプロセッシングは GIL を回避することでうまく機能し、データをトップレベルのハンドラーに簡単に渡すことができます。ただし、これは次の項目である局所性に反します。I/O をよりランダムにする可能性があります。

(3) 地域。何らかの方法でデータを整理 (または読み取りを順序付け) して、データが互いに近くなるようにします。 mmap()システムのページサイズに従ってファイルをチャンクでページングします。

>>> import mmap
>>> mmap.PAGESIZE
4096
>>> mmap.ALLOCATIONGRANULARITY
4096
>>> 

データが近くにある場合 (4k チャンク内)、データは既にバッファー キャッシュに読み込まれています。

(4) より良いハードウェア。SSDのように。

これをSSDで実行しましたが、はるかに高速でした。アンパックが高価かどうか疑問に思って、python のプロファイルを実行しました。そうではありません:

$ python -m cProfile test.py mmap                                                                                                                        
121679286
         26843553 function calls in 8.369 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    6.204    6.204    8.357    8.357 test.py:24(bymmap)
        1    0.012    0.012    8.369    8.369 test.py:3(<module>)
 26843546    1.700    0.000    1.700    0.000 {_struct.unpack}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 {method 'fileno' of 'file' objects}
        1    0.000    0.000    0.000    0.000 {open}
        1    0.000    0.000    0.000    0.000 {posix.stat}
        1    0.453    0.453    0.453    0.453 {range}

補遺:

好奇心に負けて試してみましたmultiprocessing。パーティショニングを詳しく調べる必要がありますが、アンパックの数 (53687092) は試行間で同じです:

$ time ./test2.py 4
[(4415068.0, 13421773), (-145566705.0, 13421773), (14296671.0, 13421773), (109804332.0, 13421773)]
(-17050634.0, 53687092)

real    0m5.629s
user    0m17.756s
sys     0m0.066s
$ time ./test2.py 1
[(264140374.0, 53687092)]
(264140374.0, 53687092)

real    0m13.246s
user    0m13.175s
sys     0m0.060s

コード:

#!/usr/bin/python

import functools
import multiprocessing
import mmap
import os
import struct
import sys

FILE = "/tmp/random"  # dd if=/dev/random of=/tmp/random bs=1024k count=1024
SIZE = os.stat(FILE).st_size
BYTES = 2
SKIP = 10


def bymmap(poolsize, n):
    partition = SIZE/poolsize
    initial = n * partition
    end = initial + partition
    sum = 0.0
    unpacks = 0
    with open(FILE, "r") as fd:
        mm = mmap.mmap(fd.fileno(), 0, prot=mmap.PROT_READ)
        for offset in xrange(initial, end, SKIP*BYTES):
            data = mm[offset:offset+BYTES]
            sum += struct.unpack('h', data)[0]
            unpacks += 1
    return (sum, unpacks)


poolsize = int(sys.argv[1])
pool = multiprocessing.Pool(poolsize)
results = pool.map(functools.partial(bymmap, poolsize), range(0, poolsize))
print results
print reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]), results)
于 2016-06-03T21:39:22.440 に答える