これは、 を使用して比較的簡単に解決できますBlockingCollection<T>
。
1 つをキューとして使用し、それへの参照をproducer()
および のそれぞれに渡すことができますconsumers()
。
GetConsumingEnumerable()
各コンシューマー スレッドから呼び出し、 foreach
.
プロデューサー スレッドはアイテムをコレクションに追加し、アイテムのCompleteAdding()
生成が完了すると呼び出します。これにより、すべてのコンシューマ スレッドが自動的に foreach ループを終了します。
以下は基本的な例です (エラー処理なし)。への呼び出しThread.Sleep()
は負荷をシミュレートするためのものであり、実際のコードでは使用しないでください。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
internal class Program
{
private static void Main(string[] args)
{
ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code.
var plant = new ProcessingPlant();
plant.Process();
Console.WriteLine("Work complete.");
}
}
public sealed class ProcessingPlant
{
private readonly BlockingCollection<string> _queue = new BlockingCollection<string>();
public void Process()
{
Parallel.Invoke(producer, consumers);
}
private void producer()
{
for (int i = 0; i < 100; ++i)
{
string item = i.ToString();
Console.WriteLine("Producer is queueing {0}", item);
_queue.Add(item); // <- Here's where we add an item to the queue.
Thread.Sleep(0);
}
_queue.CompleteAdding(); // <- Here's where we make all the consumers
} // exit their foreach loops.
private void consumers()
{
Parallel.Invoke(
() => consumer(1),
() => consumer(2),
() => consumer(3),
() => consumer(4),
() => consumer(5)
);
}
private void consumer(int id)
{
Console.WriteLine("Consumer {0} is starting.", id);
foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items.
{
Console.WriteLine("Consumer {0} read {1}", id, item);
Thread.Sleep(0);
}
Console.WriteLine("Consumer {0} is stopping.", id);
}
}
}
(コンシューマーを開始するためだけに余分なスレッドを使用していることはわかっていますが、BlockingCollection の使用方法を示すという本当のポイントが不明瞭になるのを避けるために、このようにしました。)