2

バイト ストリームを転送する必要があり、1 つのプロデューサー スレッドと 1 つのコンシューマー スレッドがあります。ほとんどの場合、プロデューサーの速度はコンシューマーよりも高速であり、アプリケーションの QoS のために十分なバッファー データが必要です。私の問題について読んだところ、共有バッファー、PipeStream .NET クラスなどの解決策があります...このクラスはサーバー上で何度もインスタンス化されるため、最適化された解決策が必要です。ByteArray の Queue を使用するのは良い考えですか?

はいの場合、最適化アルゴリズムを使用してキューのサイズと各 ByteArray の容量を推測し、理論的には私のケースに適合します。

いいえの場合、最善のアプローチは何ですか?

C# または VB での ByteArray Queue のロックフリーでスレッドセーフな実装があれば教えてください。

前もって感謝します

4

6 に答える 6

3

バイトごとに生成および消費する代わりに、チャンクで作業する場合、おそらくはるかに高速化されます。その場合、コードの「ロックフリー」はおそらくまったく問題になりません。実際、従来のロックソリューションの方が望ましい場合があります。実演してみます。

C# では、ロックのない、単一のプロデューサー、単一のコンシューマー、制限付きキューが提供されます。(リスト A)
難解なインターロック操作はなく、明示的なメモリ バリアさえありません。一見したところ、これは最高に高速でロックフリーであるとしましょう。ではない?ここで、Marc Gravell が
提供したロック ソリューションと比較してみましょう。

コア間で共有 L3 キャッシュを持たないデュアル CPU マシンを使用します。最大で 2 倍の高速化が期待されます。2 倍のスピードアップは、ロックフリー ソリューションが理論上の限界で理想的に機能することを意味します。
ロックフリー コードの理想的な環境を作るために、ここからユーティリティ クラスを使用して、プロデューサー スレッドとコンシューマー スレッドの CPU アフィニティも設定します。
テストの結果のコードを (リスト B) に示します。

それはcaを生産しています。あるスレッドで 10MBytes を別のスレッドで消費します。
キューのサイズは 32KBytes に固定されています。満杯の場合、プロデューサーは待機します。
私のマシンでの典型的なテストの実行は次のようになります。

LockFreeByteQueue: 799ms
ByteQueue: 1843ms

ロックフリー キューは高速です。うわー、それは2倍以上の速さです!それは自慢できることです。:)
何が起こっているのか見てみましょう。Marc のロッキング キューはまさにそれを行います。ロックします。バイトごとにこれを行います。

バイトごとにロックして、データをバイトごとにプッシュする必要は本当にあるのでしょうか? ネットワーク上にチャンクで到着することはほぼ間違いありません (約 1k パケットのように)。内部ソースから実際にバイトごとに到着したとしても、プロデューサーは簡単にそれを適切なチャンクにパッケージ化できます。
それをやってみましょう - バイトごとに生成して消費する代わりに、チャンクで作業し、他の 2 つのテストをマイクロベンチマークに追加しましょう (リスト C、ベンチマーク本体に挿入するだけです)。
典型的な実行は次のようになります。

LockFreePageQueue: 33ms
PageQueue: 25ms

現在、どちらも実際には元のロックフリー コードよりも 20 倍高速です。チャンクを追加したMarc のソリューションは、チャンキングを使用したロックフリー コードよりも実際に高速です。
2 倍のスピードアップをもたらすロックのない構造を採用する代わりに、ロックで問題なく機能する別のソリューションを試みた結果、20 倍 (!) のスピードアップが得られました。
多くの問題の鍵は、ロックを回避することではなく、共有を回避してロックを最小限に抑えることです。上記の場合、バイトコピー中は共有を避けることができます。
ほとんどの場合、プライベート構造で作業してから、単一のポインターをキューに入れることができます。これにより、共有スペースと時間をキューへの単一のポインターの単一の挿入に縮小できます。

リスト A、ロックフリー、単一プロデューサー、単一コンシューマー キュー:

public class BoundedSingleProducerSingleConsumerQueue<T>
{
    T[] queue;
    volatile int tail;
    volatile int head;

    public BoundedSingleProducerSingleConsumerQueue(int capacity)
    {
        queue = new T[capacity + 1];
        tail = head = 0;
    }

    public bool TryEnqueue(T item)
    {
        int newtail = (tail + 1) % queue.Length;
        if (newtail == head) return false;
        queue[tail] = item;
        tail = newtail;
        return true;
    }

    public bool TryDequeue(out T item)
    {
        item = default(T);
        if (head == tail) return false;
        item = queue[head];
        queue[head] = default(T);
        head = (head + 1) % queue.Length;
        return true;
    }
}

リスト B、マイクロベンチマーク:

class Program
{
    static void Main(string[] args)
    {
        for (int numtrials = 3; numtrials > 0; --numtrials)
        {
            using (ProcessorAffinity.BeginAffinity(0))
            {
                int pagesize = 1024 * 10;
                int numpages = 1024;
                int totalbytes = pagesize * numpages;

                BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
                Stopwatch sw = new Stopwatch();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
                        }
                    }
                });
                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp;
                    while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
                }
                sw.Stop();
                Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);


                SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
                sw.Reset();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            byteQueue.Enqueue((byte)(i & 0xFF));
                        }
                    }
                });

                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp = byteQueue.Dequeue();
                }
                sw.Stop();
                Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);

                Console.ReadKey();
            }
        }
    }
}

リスト C、チャンク テスト:

BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            while (!lockfreePageQueue.TryEnqueue(page)) ;
        }
    }
});
for (int i = 0; i < numpages; i++)
{
    byte[] page;
    while (!lockfreePageQueue.TryDequeue(out page)) ;
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);

SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);

ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            pageQueue.Enqueue(page);
        }
    }
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
    byte[] page = pageQueue.Dequeue();
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);
于 2010-04-11T20:25:25.403 に答える
2

.NET 4にはSystem.Collections.Concurrent.Queue<T>、これらのものが可能な限りロックフリーであるものがあります(まだ一般的ですが)。

于 2010-04-10T11:51:15.793 に答える
2

Dr. Dobbsは C++でロックフリー キューを実装しましたが、これは比較的簡単に C# に採用できます。これは、プロデューサが 1 つだけある場合に機能します (コンシューマはいくつあってもかまいません)。

基本的な考え方は、移動可能なヘッドとテールの参照とともに、基になる構造として双方向リンク リストを使用することです。アイテムが生成されると、最後に追加され、リストの先頭と現在の「先頭」の間のすべてが削除されます。消費するには、頭を上に動かしてみてください。末尾に当たった場合は失敗し、そうでない場合は成功して新しい要素を返します。操作の特定の順序により、本質的にスレッドセーフになります。

ただし、ここでこのような「ロックフリー」設計を使用すると、2 つの大きな問題があります。

  1. キュー サイズに上限を強制する方法はありません。これは、プロデューサーがコンシューマーよりも高速な場合に深刻な問題になる可能性があります。

  2. 設計上、Consume何も生成されていない場合、メソッドは単に要素の取得に失敗する必要があります。つまり、コンシューマ用に独自のロックを実装する必要があり、そのようなロックは常にビジー待機 (パフォーマンス スペクトルでのロックよりもはるかに悪い) または時限待機 (コンシューマをさらに遅くする) のいずれかです。

これらの理由から、ロックフリー構造が本当に必要かどうかを真剣に検討することをお勧めします。多くの人が、ロックを使用した同等の構造よりも「高速」になると考えてこのサイトにアクセスしますが、ほとんどのアプリケーションの実際の違いはごくわずかであるため、通常は複雑さを追加する価値はありません。待機状態 (または警告可能な待機) はビジー待機よりもはるかに安価であるため、パフォーマンスが低下します。

マルチコア マシンとメモリ バリアの必要性により、効果的なロックフリー スレッドはさらに複雑になります。通常の操作では、依然として順不同で実行される可能性があり、.NET ではジッタがさらに命令の順序を変更する可能性があるため、コードにvolatile変数とThread.MemoryBarrier呼び出しを追加する必要があり、これもロックの作成に寄与する可能性があります。 -無料バージョンは、基本的な同期バージョンよりも高価です。

最初に単純な古い同期されたプロデューサー/コンシューマー キューを使用し、アプリケーションをプロファイリングして、パフォーマンス要件を満たすことができるかどうかを判断してはどうでしょうか? Joseph Albahari のサイトには、優れた効率的な PC キューの実装があります。または、Richard が言及しているように、.NET 4.0 フレームワークを使用している場合は、単純にConcurrentQueueまたはBlockingCollectionを使用できます。

最初にテストします - 実装が簡単な同期キューのロード テストを行い、実際にロックに費やされる時間を監視します。とにかくしなければならないのですが、実際にロックを取得して解放した後、それらがシグナル状態になりました。それがプログラムの実行時間の 1% を超えているとしたら、私は非常に驚かれることでしょう。しかし、そうである場合は、ロックフリーの実装を検討し始めてください。実際にパフォーマンスが向上していることを確認するために、それらの実装も確実にプロファイリングしてください。

于 2010-04-10T15:18:00.460 に答える
1

ここではスロットリングが重要です。その音から、この雑誌の記事の BoundedBuffer クラスは法案に適合しています。同様のクラスが、BlockingCollection クラスとして .NET 4.0 で利用できるようになります。バッファサイズの調整はあなた次第です。

于 2010-04-10T13:24:43.993 に答える
0

最も重要な部分は、共有オブジェクトの設計です。私のシナリオでは、リーダーとライターは別々のバッファー(ビッグデータチャンク)を個別に使用でき、キューなどの共有FIFOオブジェクトへのアクセスのみを同期する必要があります。このようにして、ロック時間が最小限に抑えられ、スレッドはジョブを並行して完了することができます。そして、.NET framewok 4.0を使用すると、この概念の実装が簡単になります。

System.Collections.Concurrent名前空間にはConcurrentQueue(Of T)クラスがあり、arrayByteは私のシナリオのキュータイプとして使用するのに適したタイプです。名前空間には他にもスレッドセーフなコレクションがあります。

http://msdn.microsoft.com/en-us/library/system.collections.concurrent.aspx

于 2010-06-05T19:40:33.197 に答える
0

Julian M BucknallはC#で1つ書いています。

于 2010-04-11T10:40:59.653 に答える