2

私は、ロックフリーの複数のプロデューサーと複数のコンシューマーのリングバッファーで (私の試み) 頭を悩ませてきました。アイデアの基本は、unsigned char 型と unsigned short 型の固有のオーバーフローを使用し、要素バッファーをこれらの型のいずれかに固定してから、リング バッファーの先頭に自由にループバックすることです。

問題は、私のソリューションが複数のプロデューサーに対して機能しないことです (ただし、N 個のコンシューマー、および単一のプロデューサー、単一のコンシューマーに対しては機能します)。

#include <atomic>

template<typename Element, typename Index = unsigned char> struct RingBuffer
{
  std::atomic<Index> readIndex;
  std::atomic<Index> writeIndex;
  std::atomic<Index> scratchIndex;
  Element elements[1 << (sizeof(Index) * 8)];

  RingBuffer() :
    readIndex(0),
    writeIndex(0),
    scratchIndex(0)
  {
    ;
  }

  bool push(const Element & element)
  {
    while(true)
    {
      const Index currentReadIndex = readIndex.load();
      Index currentWriteIndex = writeIndex.load();
      const Index nextWriteIndex = currentWriteIndex + 1;
      if(nextWriteIndex == currentReadIndex)
      {
        return false;
      }

      if(scratchIndex.compare_exchange_strong(
        currentWriteIndex, nextWriteIndex))
      {
        elements[currentWriteIndex] = element;
        writeIndex = nextWriteIndex;
        return true;
      }
    }
  }

  bool pop(Element & element)
  {
    Index currentReadIndex = readIndex.load();

    while(true)
    {
      const Index currentWriteIndex = writeIndex.load();
      const Index nextReadIndex = currentReadIndex + 1;
      if(currentReadIndex == currentWriteIndex)
      {
        return false;
      }

      element = elements[currentReadIndex];

      if(readIndex.compare_exchange_strong(
        currentReadIndex, nextReadIndex))
      {
        return true;
      }
    }
  }
};

書き込みの主なアイデアは、疑似ロックとして機能する一時インデックス「scratchIndex」を使用して、writeIndex を更新し、他のプロデューサーが進行できるようにする前に、一度に 1 つのプロデューサーのみが要素バッファーにコピー構築できるようにすることでした。 . 私のアプローチが「ロックフリー」であることをほのめかして異教徒と呼ばれる前に、このアプローチが完全にロックフリーではないことを認識していますが、実際には (うまくいけば!) 通常のミューテックスを使用するよりもはるかに高速です!

私はここで(より複雑な)MPMCリングバッファソリューションを認識しています http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue、しかし私は実際に自分の考えを実験して比較していますそのアプローチに対して、それぞれが優れている場所を見つけます (または、実際に私のアプローチが完全に失敗するかどうか!)。

私が試したこと。

  • compare_exchange_weak の使用
  • 私が望む動作に一致するより正確な std::memory_order を使用する
  • 私が持っているさまざまなインデックス間にキャッシュライン パッドを追加する
  • Element 配列だけでなく要素を std::atomic にする

これは、アトミック アクセスを使用してミューテックスを使用する方法に関する私の頭の中の根本的なセグメンテーション違反に要約されると確信しています。私の頭の中でどのニューロンが大幅に失火しているかを指摘できる人には、心から感謝します! :)

4

2 に答える 2

2

これはABA 問題の一種です。成功したプロデューサーは次のようになります。

  1. ロードcurrentReadIndex
  2. ロードcurrentWriteIndex
  3. cmpxchgストアscratchIndex = nextWriteIndex
  4. お店element
  5. お店writeIndex = nextWriteIndex

プロデューサーが何らかの理由でステップ 2 と 3 の間で十分に長く停止した場合、他のプロデューサーがキュー全体のデータを生成し、まったく同じインデックスにラップアラウンドして、ステップ 3 の比較交換が成功する可能性があります。 (scratchIndex がたまたま currentWriteIndex と同じになるため)。

それ自体は問題ありません。失速したプロデューサーは完全にインクリメントしてキューをロックする権利を持ってscratchIndexいます — 魔法の ABA 検出 cmpxchg がストアを拒否したとしても、プロデューサーは単純に再試行し、まったく同じ をリロードし、currentWriteIndex通常どおり続行します。

実際の問題は、nextWriteIndex == currentReadIndexステップ 2 と 3 の間のチェックです。 の場合、キューは論理的に空ですcurrentReadIndex == currentWriteIndex。したがって、このチェックは、コンシューマがまだポップしていない要素を上書きするほど先に進むプロデューサがないことを確認するために存在します。currentReadIndexすべてのコンシューマが監視されたと監視された の間に「閉じ込められる」必要があるため、このチェックを最初に 1 回行うことは安全であるように思われcurrentWriteIndexます。

ただし、別のプロデューサーがやって来てwriteIndex、消費者をその罠から解放することを除いて。プロデューサがステップ 2 と 3 の間で停止した場合、ウェイクアップ時に格納されている値はreadIndex絶対に何でもかまいません。

以下は、問題が発生していることを示す、空のキューから始まる例です。

  1. Producer Aはステップ 1 と 2 を実行します。読み込まれたインデックスは両方とも 0 です。キューは空です。
  2. プロデューサー Bは要素を中断して生成します。
  3. コンシューマーが要素をポップします。両方のインデックスは 1 です。
  4. プロデューサ Bはさらに 255 個の要素を生成します。書き込みインデックスは 0 に戻りますが、読み取りインデックスは 1 のままです。
  5. プロデューサーAが眠りから覚める。以前に読み取りインデックスと書き込みインデックスの両方を 0 (空のキュー!) としてロードしていたので、ステップ 3 を試みます。他のプロデューサが偶然インデックス 0 で一時停止したため、比較交換は成功し、ストアが進行します。完了すると、プロデューサーは を許可writeIndex = 1し、保存されている両方のインデックスが 1 になり、キューは論理的に空になります。完全なキューに相当する要素が完全に無視されるようになりました。

(「ストール」と「ウェイクアップ」について話さなくて済む唯一の理由は、使用されるすべてのアトミックが順次一貫しているため、シングルスレッド環境にいるふりをすることができるということです。)


scratchIndex同時書き込みを保護するために使用している方法は、本質的にロックであることに注意してください。cmpxchg を正常に完了した人は誰でも、ロックが解放されるまで、キューへの完全な書き込みアクセスを取得します。この障害を修正する最も簡単な方法はscratchIndex、スピンロックに置き換えることです。ABA の影響を受けることはなく、実際に起こっていることです。

于 2014-09-01T16:05:14.660 に答える
1
 bool push(const Element & element)
  {
    while(true)
    {
      const Index currentReadIndex = readIndex.load();
      Index currentWriteIndex = writeIndex.load();
      const Index nextWriteIndex = currentWriteIndex + 1;
      if(nextWriteIndex == currentReadIndex)
      {
        return false;
      }

      if(scratchIndex.compare_exchange_strong(
        currentWriteIndex, nextWriteIndex))
      {
        elements[currentWriteIndex] = element;
        // Problem here!
        writeIndex = nextWriteIndex;
        return true;
      }
    }
  }

問題のある箇所をマークしました。複数のスレッドが同時に writeIndex = nextWriteIndex に到達する可能性があります。各書き込みはアトミックですが、データは任意の順序で書き込まれます。

同じアトミック条件を使用して 2 つの値を更新しようとしているため、これは問題です。通常、これは不可能です。メソッドの残りの部分に問題がないと仮定すると、これを回避する 1 つの方法は、scratchIndex と writeIndex の両方を double-size の単一の値に結合することです。たとえば、2 つの uint32_t 値を 1 つの uint64_t 値として扱い、それに対してアトミックに操作します。

于 2014-09-01T12:17:33.803 に答える