18

任意のサイズのバイト チャンクを先頭に効率的に追加し、末尾から任意のサイズのバイト チャンクを効率的にポップできる FIFO バッファを実装するにはどうすればよいですか?

バックグラウンド:

ファイルのようなオブジェクトから任意のサイズのチャンクでバイトを読み取るクラスがあり、それ自体がクライアントが任意のサイズのチャンクでバイトを読み取ることができるファイルのようなオブジェクトです。

私がこれを実装した方法は、クライアントがバイトのチャンクを読みたいときはいつでも、クラスが基礎となるファイルのようなオブジェクトから (それらのオブジェクトに適切なチャンク サイズで) 繰り返し読み取り、そのバイトを FIFO の先頭に追加することです。要求されたサイズのチャンクをクライアントに提供するのに十分なバイトがキューにあるまでキューに入れます。次に、それらのバイトをキューの末尾からポップして、クライアントに返します。

基になるファイルのようなオブジェクトのチャンク サイズが、クライアントがクラスから読み取るときに使用するチャンク サイズよりもはるかに大きい場合に発生するパフォーマンスの問題があります。

基盤となるファイルのようなオブジェクトのチャンク サイズが 1 MiB で、クライアントが読み取るチャンク サイズが 1 KiB であるとします。クライアントが初めて 1 KiB を要求したとき、クラスは 1 MiB を読み取って FIFO キューに追加する必要があります。次に、そのリクエストと後続の 1023 リクエストに対して、クラスは FIFO キューの末尾から 1 KiB をポップする必要があります。これは、サイズが 1 MiB から 0 バイトに徐々に減少し、その時点でサイクルが再び開始されます。

現在、これを StringIO オブジェクトで実装しています。StringIO オブジェクトの最後に新しいバイトを書き込むのは高速ですが、最初からバイトを削除するのは非常に遅くなります。これは、前のバッファー全体から最初のバイトのチャンクを差し引いたコピーを保持する新しい StringIO オブジェクトを作成する必要があるためです。

同様の問題を扱う SO の質問は、deque コンテナーを指す傾向があります。ただし、deque は双方向リンク リストとして実装されます。チャンクを両端キューに書き込むには、チャンクをオブジェクトに分割する必要があり、それぞれに 1 バイトが含まれます。次に、deque は、格納するために各オブジェクトに 2 つのポインターを追加します。おそらく、バイト数と比較して、メモリ要件が少なくとも 1 桁増加します。また、リンクされたリストをトラバースし、各オブジェクトを処理して、チャンクをオブジェクトに分割し、オブジェクトをチャンクに結合するには、長い時間がかかります。

4

4 に答える 4

16

現在、これを StringIO オブジェクトで実装しています。StringIO オブジェクトの最後に新しいバイトを書き込むのは高速ですが、最初からバイトを削除するのは非常に遅くなります。これは、前のバッファー全体から最初のバイトのチャンクを差し引いたコピーを保持する新しい StringIO オブジェクトを作成する必要があるためです。

実際、FIFO を実装する最も一般的な方法は、2 つのポインターを使用して 2 つのラップアラウンド バッファーを使用することです。

ここに画像の説明を入力 画像ソース

これで、適切な場所からの読み取り/書き込みをStringIO()使用して実装できます。.seek()

于 2012-06-06T16:05:00.507 に答える
12

更新: vartec の回答からの循環バッファー手法の実装を次に示します(私の元の回答に基づいており、好奇心旺盛な人のために以下に保存されています)。

from cStringIO import StringIO

class FifoFileBuffer(object):
    def __init__(self):
        self.buf = StringIO()
        self.available = 0    # Bytes available for reading
        self.size = 0
        self.write_fp = 0

    def read(self, size = None):
        """Reads size bytes from buffer"""
        if size is None or size > self.available:
            size = self.available
        size = max(size, 0)

        result = self.buf.read(size)
        self.available -= size

        if len(result) < size:
            self.buf.seek(0)
            result += self.buf.read(size - len(result))

        return result


    def write(self, data):
        """Appends data to buffer"""
        if self.size < self.available + len(data):
            # Expand buffer
            new_buf = StringIO()
            new_buf.write(self.read())
            self.write_fp = self.available = new_buf.tell()
            read_fp = 0
            while self.size <= self.available + len(data):
                self.size = max(self.size, 1024) * 2
            new_buf.write('0' * (self.size - self.write_fp))
            self.buf = new_buf
        else:
            read_fp = self.buf.tell()

        self.buf.seek(self.write_fp)
        written = self.size - self.write_fp
        self.buf.write(data[:written])
        self.write_fp += len(data)
        self.available += len(data)
        if written < len(data):
            self.write_fp -= self.size
            self.buf.seek(0)
            self.buf.write(data[written:])
        self.buf.seek(read_fp)

元の回答(上記のものに取って代わられました):

バッファーを使用して開始インデックス (ファイル ポインターの読み取り) を追跡し、サイズが大きくなりすぎたときに圧縮することができます (これにより、かなり優れた償却パフォーマンスが得られるはずです)。

たとえば、次のように StringIO オブジェクトをラップします。

from cStringIO import StringIO
class FifoBuffer(object):
    def __init__(self):
        self.buf = StringIO()

    def read(self, *args, **kwargs):
        """Reads data from buffer"""
        self.buf.read(*args, **kwargs)

    def write(self, *args, **kwargs):
        """Appends data to buffer"""
        current_read_fp = self.buf.tell()
        if current_read_fp > 10 * 1024 * 1024:
            # Buffer is holding 10MB of used data, time to compact
            new_buf = StringIO()
            new_buf.write(self.buf.read())
            self.buf = new_buf
            current_read_fp = 0

        self.buf.seek(0, 2)    # Seek to end
        self.buf.write(*args, **kwargs)

        self.buf.seek(current_read_fp)
于 2012-06-06T15:55:23.470 に答える
4

予想される読み取り/書き込み量について何か推測できますか?

たとえば、データを 1024 バイトのフラグメントに分割し、deque[1] を使用すると、より適切に機能する場合があります。N 個の完全なチャンクを読み取ってから、最後の 1 つのチャンクを分割して残りをキューの先頭に戻すことができます。

1) collections.deque

class collections.deque([iterable[, maxlen]])

左から右に (append() を使用して) 初期化された新しい両端キュー オブジェクトを iterable からのデータで返します。iterable が指定されていない場合、新しい両端キューは空です。

Deque は、スタックとキューを一般化したものです (名前は「デッキ」と発音され、「ダブルエンド キュー」の略です)。Deques は、deque の両側からのスレッドセーフでメモリ効率の高い追加とポップをサポートし、どちらの方向でもほぼ同じ O(1) パフォーマンスを実現します。...

于 2012-06-06T15:50:54.990 に答える