2

私は学術的なオープン ソース プロジェクトに取り組んでおり、C# で高速ブロッキング FIFO キューを作成する必要があります。私の最初の実装では、単純に同期キュー (動的拡張を使用) をリーダーのセマフォ内にラップしました。

public class FastFifoQueue<T>
{
    private T[] _array;
    private int _head, _tail, _count;
    private readonly int _capacity;
    private readonly Semaphore _readSema, _writeSema;

    /// <summary>
    /// Initializes FastFifoQueue with the specified capacity
    /// </summary>
    /// <param name="size">Maximum number of elements to store</param>
    public FastFifoQueue(int size)
    {
        //Check if size is power of 2
        //Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
        if ((size & (size - 1)) != 0)
            throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");

        _capacity = size;
        _array = new T[size];
        _count = 0;
        _head = int.MinValue; //0 is the same!
        _tail = int.MinValue;

        _readSema = new Semaphore(0, _capacity);
        _writeSema = new Semaphore(_capacity, _capacity);
    }

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        Interlocked.Exchange(ref _array[index], item);
        Interlocked.Increment(ref _count);
        _readSema.Release();
    }

    public T Dequeue()
    {
        _readSema.WaitOne();
        int index = Interlocked.Increment(ref _tail);
        index %= _capacity;
        if (index < 0) index += _capacity;
        T ret = Interlocked.Exchange(ref _array[index], null);
        Interlocked.Decrement(ref _count);
        _writeSema.Release();

        return ret;
    }

    public int Count
    {
        get
        {
            return _count;
        }
    }
}

これは、教科書にある静的配列を使用した古典的な FIFO キューの実装です。ポインターをアトミックにインクリメントするように設計されており、到達したときにポインターをゼロに戻すことはできないため (容量-1)、モジュロを別々に計算します。理論的には、Interlocked を使用することは、インクリメントを行う前にロックすることと同じです。セマフォがあるため、複数のプロデューサー/コンシューマーがキューに入る可能性がありますが、キュー ポインターを変更できるのは一度に 1 つだけです。まず、Interlocked.Increment は最初にインクリメントしてから戻るため、ポスト インクリメント値を使用して、配列内の位置 1 からストア項目を開始することに制限されていることを既に理解しています。問題ありません、一定の値になったら0に戻ります

それの何が問題なのですか?高負荷で実行していると、キューが NULL 値を返すことがあることは信じられないでしょう。繰り返しますが、キューにnullをエンキューするメソッドはないと確信しています。念のために Enqueue に null チェックを入れてみましたが、エラーは発生しませんでした。そのためのテストケースをVisual Studioで作成しました(ちなみに私はまぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁんのようにデュアルコアCPU使ってます)

    private int _errors;

    [TestMethod()]
    public void ConcurrencyTest()
    {
        const int size = 3; //Perform more tests changing it
        _errors = 0;
        IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
        Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
        Thread[] producers = new Thread[size], consumers = new Thread[size];

        for (int i = 0; i < size; i++)
        {
            producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
            consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
            producers[i].Start(queue);
            consumers[i].Start(queue);
        }

        Thread.Sleep(new TimeSpan(0, 0, 1, 0));

        for (int i = 0; i < size; i++)
        {
            producers[i].Abort();
            consumers[i].Abort();
        }

        Assert.AreEqual(0, _errors);
    }

    private void LoopProducer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                try
                {
                    q.Enqueue(new object());
                }
                catch
                { }

            }
        }
        catch (ThreadAbortException)
        { }
    }

    private void LoopConsumer(object queue)
    {
        try
        {
            IFifoQueue<object> q = (IFifoQueue<object>)queue;
            while (true)
            {
                object item = q.Dequeue();
                if (item == null) Interlocked.Increment(ref _errors);
            }
        }
        catch (ThreadAbortException)
        { }

    }

コンシューマ スレッドが null を取得すると、エラーがカウントされます。1 つのプロデューサーと 1 つのコンシューマーでテストを実行すると、成功します。2 つのプロデューサと 2 つのコンシューマ、またはそれ以上でテストを実行すると、2000 のリークが検出されるという惨事が発生します。Enqueue メソッドに問題がある可能性があることがわかりました。設計上の契約により、プロデューサーは空 ( null )のセルにのみ書き込むことができますが、コードをいくつかの診断で変更すると、プロデューサーが空でないセルに書き込みを試みていることがわかりました。 " データ。

    public void Enqueue(T item)
    {
        _writeSema.WaitOne();
        int index = Interlocked.Increment(ref _head);
        index %= _capacity;
        if (index < 0) index += _capacity;
        //_array[index] = item;
        T leak = Interlocked.Exchange(ref _array[index], item);

        //Diagnostic code
        if (leak != null)
        {
            throw new InvalidOperationException("Too bad...");
        }
        Interlocked.Increment(ref _count);

        _readSema.Release();
    }

「ひどい」例外が頻繁に発生します。しかし、インクリメントはアトミックであり、ライターのセマフォは空き配列セルと同じ数のライターしか許可しないため、同時書き込みから競合が発生するのは奇妙です。

誰かがそれを手伝ってくれますか?あなたのスキルと経験を私と共有していただければ幸いです。

ありがとうございました。

4

3 に答える 3

6

言わなければならないのは、これは非常に賢いアイデアだと思い、バグがどこにあるのか (私が思うに) に気づき始める前に、しばらく考えました。ですから、一方で、このような巧妙なデザインを思いついたことを称賛します! しかし、同時に、 「カーニハンの法則」を証明したことを恥じてください。

デバッグは、最初にコードを書くよりも 2 倍難しい作業です。したがって、コードをできるだけ賢く書いたとしても、定義上、それをデバッグするほど賢くはありません。

問題は基本的に次のとおりです。WaitOneandRelease呼び出しがすべてのand操作を効果的にシリアル化すると想定しています。しかし、それはここで起こっていることではありません。クラスは、イベントの特定の順序を保証するためではなく、リソースにアクセスするスレッドの数を制限するために使用されることに注意してください。それぞれの との間で何が起こるかは、との呼び出し自体と同じ「スレッド順序」で発生するとは限りません。EnqueueDequeueSemaphoreWaitOneReleaseWaitOneRelease

これは言葉で説明するのは難しいので、視覚的な図を示してみましょう。

キューの容量が 8 で、次のようになっているとします (オブジェクトを0表現nullしてx表現します)。

[ xxxxxxxx ]

SoEnqueueは 8 回呼び出され、キューがいっぱいです。したがって、_writeSemaセマフォは でブロックされWaitOne_readSemaセマフォは ですぐに戻りますWaitOne

ここで、Dequeueが 3 つの異なるスレッドで多かれ少なかれ同時に呼び出されたとします。これらを T1、T2、T3 と呼びましょう。

Dequeue先に進む前に、参照用にいくつかのラベルを実装に適用させてください。

public T Dequeue()
{
    _readSema.WaitOne();                                   // A
    int index = Interlocked.Increment(ref _tail);          // B
    index %= _capacity;
    if (index < 0) index += _capacity;
    T ret = Interlocked.Exchange(ref _array[index], null); // C
    Interlocked.Decrement(ref _count);
    _writeSema.Release();                                  // D

    return ret;
}

OK、T1、T2、T3 はすべて点Aを通過しました。次に、簡単にするために、それぞれが「順番に」ラインBに到達し、T1 のindexが 0、T2 のindexが 1、T3 のindexが 2 であるとします。

ここまでは順調ですね。ここから、T1、T2、および T3 が任意の指定された順序で回線Dに到達するという保証はありません。T3 が実際にT1 および T2よりも先に進み、ラインCを通過して(したがって に設定_array[2]されnull)、ラインDまで移動するとします。

この時点の後、_writeSemaシグナルが送信されます。つまり、キューに書き込み用のスロットが 1 つあるということですよね? しかし、キューは次のようになります。

[ xx 0 xxxxx ]

そのため、その間に別のスレッドが の呼び出しを行った場合、スロット 0 が空でなくてもEnqueue、実際には を通過 _writeSema.WaitOneし、 をインクリメントして0_headを取得します。この結果、T1 (彼を覚えていますか?) が読み取る前に、スロット 0 のアイテムが実際に上書きされる可能性があります。index

自分の価値観がどこから来ているのかを理解するにnullは、先ほど説明したプロセスの逆を視覚化するだけで済みます。つまり、キューが次のようになっているとします。

[ 0 0 0 0 0 0 0 ]

T1、T2、および T3 の 3 つのスレッドはすべて、Enqueueほぼ同時に呼び出します。T3は_head 最後にインクリメントしますが、その項目を (で) 挿入し、最初_array[2]にを呼び出します。その結果、信号は送信されますが、キューは次のようになります。_readSema.Release _readSema

[ 0 0 x 0 0 0 0 0 ]

したがって、その間に別のスレッドが への呼び出しをDequeue行った場合 (T1 と T2 が処理を終了する前に)、スロット 0空であっても_readSema.WaitOne、 を通過し、 をインクリメントして0_tailを取得します。index

だからあなたの問題があります。解決策については、現時点で提案はありません。考え直す時間をください...(この回答を投稿しているのは、心に新鮮であり、役立つかもしれないと感じているためです。)

于 2010-10-10T02:42:23.443 に答える
3

(私が投票したDan Taoに+1が答えを持っています)エンキューは次のように変更されます...

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
    ;

デキューは次のように変更されます...

while( (ret = Interlocked.Exchange(ref _array[index], null)) == null)
    ;

これは、Dan Tao の優れた分析に基づいています。インデックスはアトミックに取得されるため、(enqueue メソッドまたは dequeue メソッドでスレッドが停止または終了しないと仮定すると) リーダーは最終的にセルに値が入力されることが保証され、ライターは最終的にセルが解放される (null) ことが保証されます。

于 2010-10-10T03:40:14.433 に答える
2

ダン・タオとレス、ありがとう。

大変お世話になりました。ダン、あなたは私の心を開いた:クリティカルセクション内にいくつの生産者/消費者がいるかは重要ではありません.重要なのは、ロックが順番に解放されることです. Les、あなたは問題の解決策を見つけました。

いよいよ、お二人のご協力のおかげで作成した最終的なコードを使用して、私自身の質問に答える時が来ました。まあ、大したことではありませんが、Les のコードから少し強化したものです

エンキュー:

while (Interlocked.CompareExchange(ref _array[index], item, null) != null)
            Thread.Sleep(0);

デキュー:

while ((ret = Interlocked.Exchange(ref _array[index], null)) == null)
            Thread.Sleep(0);

Thread.Sleep(0) の理由 要素を取得/保存できないことがわかった場合、なぜすぐにもう一度チェックするのでしょうか? 他のスレッドが読み書きできるようにするには、コンテキストの切り替えを強制する必要があります。明らかに、スケジュールされる次のスレッドは、操作できない別のスレッドになる可能性がありますが、少なくとも強制します。ソース: http://progfeatures.blogspot.com/2009/05/how-to-force-thread-to-perform-context.html

また、私の主張を証明するために、前のテスト ケースのコードもテストしました。

睡眠なし(0)

Read 6164150 elements
Wrote 6322541 elements
Read 5885192 elements
Wrote 5785144 elements
Wrote 6439924 elements
Read 6497471 elements

睡眠付き(0)

Wrote 7135907 elements
Read 6361996 elements
Wrote 6761158 elements
Read 6203202 elements
Wrote 5257581 elements
Read 6587568 elements

私はこれが「偉大な」発見ではないことを知っており、これらの数値に対してチューリング賞を受賞するつもりはありません. パフォーマンスの増加は劇的ではありませんが、ゼロより大きいです。コンテキストの切り替えを強制すると、より多くの RW 操作を実行できます (= スループットが向上します)。

明確にするために:私のテストでは、プロデューサー/コンシューマーの問題をシミュレートするのではなく、キューのパフォーマンスを評価するだけです。しかし、皆さんのおかげで、私のアプローチが機能することを実証しました。

MS-RL として利用可能なオープン ソースのコード: http://logbus-ng.svn.sourceforge.net/viewvc/logbus-ng/trunk/logbus-core/It.Unina.Dis.Logbus/Utils/FastFifoQueue.cs?revision =461&view=マークアップ

于 2010-10-12T22:28:23.203 に答える