私は学術的なオープン ソース プロジェクトに取り組んでおり、C# で高速ブロッキング FIFO キューを作成する必要があります。私の最初の実装では、単純に同期キュー (動的拡張を使用) をリーダーのセマフォ内にラップしました。
public class FastFifoQueue<T>
{
private T[] _array;
private int _head, _tail, _count;
private readonly int _capacity;
private readonly Semaphore _readSema, _writeSema;
/// <summary>
/// Initializes FastFifoQueue with the specified capacity
/// </summary>
/// <param name="size">Maximum number of elements to store</param>
public FastFifoQueue(int size)
{
//Check if size is power of 2
//Credit: http://stackoverflow.com/questions/600293/how-to-check-if-a-number-is-a-power-of-2
if ((size & (size - 1)) != 0)
throw new ArgumentOutOfRangeException("size", "Size must be a power of 2 for this queue to work");
_capacity = size;
_array = new T[size];
_count = 0;
_head = int.MinValue; //0 is the same!
_tail = int.MinValue;
_readSema = new Semaphore(0, _capacity);
_writeSema = new Semaphore(_capacity, _capacity);
}
public void Enqueue(T item)
{
_writeSema.WaitOne();
int index = Interlocked.Increment(ref _head);
index %= _capacity;
if (index < 0) index += _capacity;
//_array[index] = item;
Interlocked.Exchange(ref _array[index], item);
Interlocked.Increment(ref _count);
_readSema.Release();
}
public T Dequeue()
{
_readSema.WaitOne();
int index = Interlocked.Increment(ref _tail);
index %= _capacity;
if (index < 0) index += _capacity;
T ret = Interlocked.Exchange(ref _array[index], null);
Interlocked.Decrement(ref _count);
_writeSema.Release();
return ret;
}
public int Count
{
get
{
return _count;
}
}
}
これは、教科書にある静的配列を使用した古典的な FIFO キューの実装です。ポインターをアトミックにインクリメントするように設計されており、到達したときにポインターをゼロに戻すことはできないため (容量-1)、モジュロを別々に計算します。理論的には、Interlocked を使用することは、インクリメントを行う前にロックすることと同じです。セマフォがあるため、複数のプロデューサー/コンシューマーがキューに入る可能性がありますが、キュー ポインターを変更できるのは一度に 1 つだけです。まず、Interlocked.Increment は最初にインクリメントしてから戻るため、ポスト インクリメント値を使用して、配列内の位置 1 からストア項目を開始することに制限されていることを既に理解しています。問題ありません、一定の値になったら0に戻ります
それの何が問題なのですか?高負荷で実行していると、キューが NULL 値を返すことがあることは信じられないでしょう。繰り返しますが、キューにnullをエンキューするメソッドはないと確信しています。念のために Enqueue に null チェックを入れてみましたが、エラーは発生しませんでした。そのためのテストケースをVisual Studioで作成しました(ちなみに私はまぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁぁんのようにデュアルコアCPU使ってます)
private int _errors;
[TestMethod()]
public void ConcurrencyTest()
{
const int size = 3; //Perform more tests changing it
_errors = 0;
IFifoQueue<object> queue = new FastFifoQueue<object>(2048);
Thread.CurrentThread.Priority = ThreadPriority.AboveNormal;
Thread[] producers = new Thread[size], consumers = new Thread[size];
for (int i = 0; i < size; i++)
{
producers[i] = new Thread(LoopProducer) { Priority = ThreadPriority.BelowNormal };
consumers[i] = new Thread(LoopConsumer) { Priority = ThreadPriority.BelowNormal };
producers[i].Start(queue);
consumers[i].Start(queue);
}
Thread.Sleep(new TimeSpan(0, 0, 1, 0));
for (int i = 0; i < size; i++)
{
producers[i].Abort();
consumers[i].Abort();
}
Assert.AreEqual(0, _errors);
}
private void LoopProducer(object queue)
{
try
{
IFifoQueue<object> q = (IFifoQueue<object>)queue;
while (true)
{
try
{
q.Enqueue(new object());
}
catch
{ }
}
}
catch (ThreadAbortException)
{ }
}
private void LoopConsumer(object queue)
{
try
{
IFifoQueue<object> q = (IFifoQueue<object>)queue;
while (true)
{
object item = q.Dequeue();
if (item == null) Interlocked.Increment(ref _errors);
}
}
catch (ThreadAbortException)
{ }
}
コンシューマ スレッドが null を取得すると、エラーがカウントされます。1 つのプロデューサーと 1 つのコンシューマーでテストを実行すると、成功します。2 つのプロデューサと 2 つのコンシューマ、またはそれ以上でテストを実行すると、2000 のリークが検出されるという惨事が発生します。Enqueue メソッドに問題がある可能性があることがわかりました。設計上の契約により、プロデューサーは空 ( null )のセルにのみ書き込むことができますが、コードをいくつかの診断で変更すると、プロデューサーが空でないセルに書き込みを試みていることがわかりました。 " データ。
public void Enqueue(T item)
{
_writeSema.WaitOne();
int index = Interlocked.Increment(ref _head);
index %= _capacity;
if (index < 0) index += _capacity;
//_array[index] = item;
T leak = Interlocked.Exchange(ref _array[index], item);
//Diagnostic code
if (leak != null)
{
throw new InvalidOperationException("Too bad...");
}
Interlocked.Increment(ref _count);
_readSema.Release();
}
「ひどい」例外が頻繁に発生します。しかし、インクリメントはアトミックであり、ライターのセマフォは空き配列セルと同じ数のライターしか許可しないため、同時書き込みから競合が発生するのは奇妙です。
誰かがそれを手伝ってくれますか?あなたのスキルと経験を私と共有していただければ幸いです。
ありがとうございました。