9

バッファ内の最も古いデータを上書きするシングルコンシューマ/シングルコンシューマ用のリングバッファを作成するロックフリーまたは非ブロッキングの方法を作成する方法を見つけようとしています。バッファがいっぱいの場合に「falseを返す」ときに機能するロックフリーアルゴリズムをたくさん読みました。つまり、追加しないでください。しかし、最も古いデータを上書きする必要があるときにそれを行う方法について説明している疑似コードさえ見つかりません。

私は GCC 4.1.2 を使用しており (制限があり、バージョンをアップグレードできません...)、Boost ライブラリがあり、過去に独自の Atomic< T > 変数型を作成しました。今後の仕様(完全ではありませんが、スレッドセーフであり、必要なことを行います)。

考えてみると、これらのアトミックを使用すれば問題は解決するはずだと思いました。私が考えていたことに関する大まかな疑似コード:

template< typename T , unsigned int Size>
class RingBuffer {
private:
Atomic<unsigned int> readIndex;
Atomic<unsigned int> writeIndex;
enum Capacity { size = Size };
T* buf;

unsigned int getNextIndex(unsigned int i)
{
 return (i + 1 ) % size;
}

public:
RingBuffer() { //create array of size, set readIndex = writeIndex = 0 }
~RingBuffer() { //delete data }
void produce(const T& t)
{
 if(writeIndex == getNextIndex(readIndex)) //1
 {
  readIndex = getNextIndex(readIndex); //2
  }
  buf[writeIndex] = t;
  writeIndex = getNextIndex(writeIndex);  //3
}

bool consume(T& t)
{
  if(readIndex == writeIndex)  //4
   return false;
  t = buf[readIndex];  
  readIndex = getNexIndex(readIndex);  //5
  return true;
}

};

私が知る限り、ここではデッドロックの状況は発生しないので、安全です (上記の実装が疑似コードレベルでも間違っている場合は、建設的な批判が常に歓迎されます)。ただし、私が見つけることができる大きな競合状態は次のとおりです。

バッファがいっぱいであると仮定しましょう。つまり、writeIndex +1 = readIndex; (1) は、consume が呼び出されたときに発生します。そして true (4) は false であるため、バッファから読み取るように移動します (5) が発生し、readIndex が 1 つ進められます (したがって、実際には、バッファ内にスペースがあり (2) が発生し、readIndex が AGAIN を進めます。したがって、価値を失う。

基本的に、ライターがリーダーを変更しなければならず、競合状態が発生するという古典的な問題です。アクセスするたびにリスト全体を実際にブロックしない限り、これを防ぐ方法は考えられません。私は何が欠けていますか??

4

3 に答える 3

7
  1. 適切な進行が保証された単一のプロデューサー/複数のコンシューマー キューから始めます。
  2. キューがいっぱいでプッシュが失敗する場合は、1 つの値をポップします。次に、新しい値をプッシュするスペースがあります。
于 2010-12-10T05:18:49.700 に答える
1

私は何が欠けていますか??

ロット:

  • プロデューサーによって上書きされている間に消費すると言います-それをどのように検出/処理していますか?
    • 多くのオプション - たとえばdo {、値をコピーします。変更シーケンス番号などを使用して、コピーの整合性を確認してください。} while (破損しています。)
  • 原子番号を使用するだけでは十分ではありません-インデックスの増分に影響を与えるためにCASスタイルのループを使用する必要もあります(ただし、これについてはすでによく読んでいると言うので、知っていると思います)
  • 記憶の壁

しかし、それを疑似コードレベルより下にあると書き留めて、明示的な質問を考えてみましょう:

  • ポイント (5) は、実際には CAS 操作が必要になります。readIndex がconsume()(破損している可能性がある)tがコピーされる前に正しくサンプリング/コピーされた場合、プロデューサーによって既にインクリメントされている場合、CAS 命令は失敗します。通常のリサンプルとリトライ CAS の代わりに、続行します。
于 2010-12-10T05:12:01.880 に答える
0

これは、最近作成したアトミック変数の循環バッファーのコードです。false を返すのではなく、データを「オーバーライド」するように変更しました。免責事項 - まだ製品グレードのテストは行われていません。

    template<int capacity, int gap, typename T> class nonblockigcircular {
  /*
   * capacity - size of cicular buffer
   * gap - minimum safety distance between head and tail to start insertion operation
   *       generally gap should exceed number of threads attempting insertion concurrently 
   *       capacity should be gap plus desired buffer size 
   * T   - is a data type for buffer to keep
   */
  volatile T buf[capacity];  // buffer

  std::atomic<int> t, h, ph, wh; 
  /* t to h data available for reading
   * h to ph - zone where data is likely written but h is not updated yet
   *   to make sure data is written check if ph==wh 
   * ph to wh - zone where data changes in progress 
   */

  bool pop(T &pwk) {
    int td, tnd;

    do {
      int hd=h.load()%capacity;
      td=t.load()%capacity;
      if(hd==td) return false;
      tnd=(td+1)%capacity;
    } while(!t.compare_exchange_weak(td, tnd));

    pwk=buf[td];
    return true;
  }


  const int  count() {
    return ( h.load()+capacity-t.load() ) % capacity;
    }

  bool push(const T &pwk) {
    const int tt=t.load();
    int hd=h.load();

    if(  capacity - (hd+capacity-tt) % capacity < gap) {
       // Buffer is too full to insert
       // return false; 
       // or delete last record as below
       int nt=t.fetch_add(1);
       if(nt==capacity-1) t.fetch_sub(capacity);
       }


    int nwh=wh.fetch_add(1);
    if(nwh==capacity-1) wh.fetch_sub(capacity);

    buf[nwh%capacity]=pwk;

    int nph=ph.fetch_add(1);
    if(nph==capacity-1) ph.fetch_sub(capacity);

    if(nwh==nph) {
      int ohd=hd;
      while(! h.compare_exchange_weak(hd, nwh) ) {
        hd=h.load();
        if( (ohd+capacity-hd) % capacity > (ohd+capacity-nwh) % capacity ) break;
      }
    }
    return true;
  }

};
于 2016-06-17T14:31:25.037 に答える