7

tcpを介して[EEG]データを受信し、それをリングバッファに書き込むクライアントを開発しています。バッファをctypesまたはnumpy配列として持つと、そのようなバッファの任意の場所にnumpyの「ビュー」を作成し、コピー操作なしでデータを読み取り/書き込み/処理できるため、非常に便利だと思いました。それとも一般的に悪い考えですか?

ただし、この方法で固定サイズの循環バッファを実装する方法がわかりません。メモリ内で隣接するバッファオブジェクトを作成したとします。バッファの終わりに達したときにデータを書き込むための最良の方法は何ですか?

考えられる1つの方法は、書き込みポインタがバッファアレイの最後に到達したときに、最初から(すでに古い)バイトの上書きを開始することです。ただし、境界の近くでは、一部のチャンク(処理用)のnumpyビューを作成できません(または作成できますか?)。これは、一部のチャンクがまだバッファー配列の最後にあり、別のチャンクがすでにあるためです。その始まり。私はそのような円形のスライスを作成することは不可能であることを読みました。これを解決する方法は?

UPD:回答ありがとうございます。誰かが同じ問題に直面した場合に備えて、これが私が持っている最終的なコードです

4

4 に答える 4

7

Nバイトのウィンドウが必要な場合は、バッファを2 * Nバイトにして、すべての入力を次の2つの場所に書き込みます。i % Nおよびi % N + N、ここiで、はバイトカウンタです。そうすれば、バッファには常にN個の連続したバイトがあります。

data = 'Data to buffer'
N = 4
buf = 2*N*['\00']

for i,c in enumerate(data):
    j = i % N
    buf[j] = c
    buf[j+N] = c
    if i >= N-1:
        print ''.join(buf[j+1:j+N+1]) 

プリント

Data
ata 
ta t
a to
 to 
to b
o bu
 buf
buff
uffe
ffer
于 2012-01-18T17:54:09.660 に答える
2

考えられる1つの方法は、書き込みポインタがバッファアレイの最後に到達したときに、最初から(すでに古い)バイトの上書きを開始することです。

これが、固定サイズのリングバッファの唯一のオプションです。

私はそのような円形のスライスを作成することは不可能であることを読みました。

そのため、Numpyビューではこれを行いません。代わりに、バッファー/配列、容量、および挿入ポイントへのポインター(インデックス)を保持して、classラッパーを作成できます。ndarrayコンテンツをNumpy配列として取得する場合は、次のようにコピーを作成する必要があります。

buf = np.array([1,2,3,4])
indices = [3,0,1,2]
contents = buf[indices]    # copy

__setitem__とを実装すれば、要素の値をインプレースで設定できます__setslice__

于 2012-01-18T11:22:09.510 に答える
2

ここでCスタイルの考え方から一歩後退する必要があると思います。挿入ごとにリングバッファを更新することは、決して効率的ではありません。リングバッファは、numpy配列が要求する連続したメモリブロックインターフェイスとは根本的に異なります。あなたがやりたいと言っているfftを含みます。

自然な解決策は、パフォーマンスのためにメモリを少し犠牲にすることです。たとえば、バッファに保持する必要のある要素の数がNの場合、N + 1024(または適切な数)の配列を割り当てます。次に、1024回の挿入ごとにN個の要素を移動するだけで済み、直接利用可能なN個の要素の連続したビューが常に表示されます。

編集:これは上記を実装するコードスニペットであり、優れたパフォーマンスを提供するはずです。ただし、要素ごとではなく、チャンクで追加することをお勧めします。そうしないと、リングバッファの実装方法に関係なく、numpyを使用することによるパフォーマンス上の利点がすぐに無効になります。

import numpy as np

class RingBuffer(object):
    def __init__(self, size, padding=None):
        self.size = size
        self.padding = size if padding is None else padding
        self.buffer = np.zeros(self.size+self.padding)
        self.counter = 0

    def append(self, data):
        """this is an O(n) operation"""
        data = data[-self.padding:]
        n = len(data)
        if self.remaining < n: self.compact()
        self.buffer[self.counter+self.size:][:n] = data
        self.counter += n

    @property
    def remaining(self):
        return self.padding-self.counter
    @property
    def view(self):
        """this is always an O(1) operation"""
        return self.buffer[self.counter:][:self.size]
    def compact(self):
        """
        note: only when this function is called, is an O(size) performance hit incurred,
        and this cost is amortized over the whole padding space
        """
        print 'compacting'
        self.buffer[:self.size] = self.view
        self.counter = 0

rb = RingBuffer(10)
for i in range(4):
    rb.append([1,2,3])
    print rb.view

rb.append(np.arange(15))
print rb.view  #test overflow
于 2014-07-20T20:40:07.943 に答える
-1

@Janne Karilaの回答の変形で、Cの
場合、numpyではありません。リングバッファがN x 1Gのように非常に広い場合は、全体を2倍にする代わりに、その行への2*Nポインタの配列を2倍にします。たとえば、N = 3の場合、初期化します

bufp = { buf[0], buf[1], buf[2], buf[0], buf[1], buf[2] };

次に、データを1回だけ書き込みanyfunc( bufp[j:j+3] )、行bufを時間順に表示します。

于 2014-07-20T10:00:15.807 に答える