コンシューマーのタイムアウトをサポートするコンカレント プロデューサー/コンシューマー コレクション (複数のプロデューサーとコンシューマー) を実装しようとしています。
現在、実際のコレクションはかなり複雑です (残念ながら System.Collections.Concurrent には何もありません) が、私の問題を示す最小限のサンプルがあります (少し似ていますBlockingCollection<T>
)。
public sealed class ProducerConsumerQueueDraft<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly object locker = new object();
public void Enqueue(T item)
{
lock (locker)
{
queue.Enqueue(item);
/* This "optimization" is broken, as Nicholas Butler points out.
if(queue.Count == 1) // Optimization
*/
Monitor.Pulse(locker); // Notify any waiting consumer threads.
}
}
public T Dequeue(T item)
{
lock (locker)
{
// Surprisingly, this needs to be a *while* and not an *if*
// which is the core of my problem.
while (queue.Count == 0)
Monitor.Wait(locker);
return queue.Dequeue();
}
}
// This isn't thread-safe, but is how I want TryDequeue to look.
public bool TryDequeueDesired(out T item, TimeSpan timeout)
{
lock (locker)
{
if (queue.Count == 0 && !Monitor.Wait(locker, timeout))
{
item = default(T);
return false;
}
// This is wrong! The queue may be empty even though we were pulsed!
item = queue.Dequeue();
return true;
}
}
// Has nasty timing-gymnastics I want to avoid.
public bool TryDequeueThatWorks(out T item, TimeSpan timeout)
{
lock (locker)
{
var watch = Stopwatch.StartNew();
while (queue.Count == 0)
{
var remaining = timeout - watch.Elapsed;
if (!Monitor.Wait(locker, remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining))
{
item = default(T);
return false;
}
}
item = queue.Dequeue();
return true;
}
}
}
アイデアは簡単です: 空のキューを見つけた消費者は通知されるのを待ち、生産者Pulse
(注:PulseAll
非効率的です) は待っているアイテムを通知します。
私の問題は、次のプロパティです Monitor.Pulse
。
Pulse を呼び出したスレッドがロックを解放すると、準備完了キュー内の次のスレッド(必ずしもパルスされたスレッドとは限りません)がロックを取得します。
これが意味することは、コンシューマ スレッド C1 がプロデューサー スレッドによって起動されてアイテムを消費する可能性があるが、別のコンシューマ スレッド C2 が C1 がロックを再取得する機会を得る前にロックを取得し、アイテムを消費して、C1 に制御が与えられたときの空のキュー。
つまり、キューが実際に空でない場合はパルスごとに消費者コードを防御的にチェックインし、そうでない場合は戻って手ぶらで待機する必要があります。
これに関する私の主な問題は、非効率的であるということです。スレッドは、作業を行うために起動され、すぐに送り返されて再び待機する可能性があります。これに関連TryDequeue
して、 をタイムアウト付きで実装するTryDequeueThatWorks
ことは、洗練されている必要がある ( を参照) 場合に、不必要に難しく、非効率的です ( を参照TryDequeueDesired
)。
どうすればMonitor.Pulse
自分のやりたいことをひねることができますか? あるいは、そうする別の同期プリミティブはありますか? TryDequeue
私が行ったことよりもタイムアウトを実装するためのより効率的かつ/またはエレガントな方法はありますか?
参考までに、これが私の希望するソリューションの問題を示すテストです。
var queue = new ProducerConsumerQueueDraft<int>();
for (int consumer = 0; consumer < 3; consumer++)
new Thread(() =>
{
while (true)
{
int item;
// This call should occasionally throw an exception.
// Switching to queue.TryDequeueThatWorks should make
// the problem go away.
if (queue.TryDequeueDesired(out item, TimeSpan.FromSeconds(1)))
{
// Do nothing.
}
}
}).Start();
Thread.Sleep(1000); // Let consumers get up and running
for (int itemIndex = 0; itemIndex < 50000000; itemIndex++)
{
queue.Enqueue(0);
}