リソースのセットを使用してプロデューサー/コンシューマー パターンを実装しようとしているため、各スレッドには 1 つのリソースが関連付けられています。たとえば、各タスクがStreamWriter
その結果を書き込む必要があるタスクのキューがあるとします。各タスクには、パラメーターが渡される必要もあります。
私は Joseph Albahari の実装から始めました (私の修正版については以下を参照してください)。
Action
のキューをリソースのキューAction<T>
に置き換え、T
スレッドに関連付けられたリソースを に渡しますAction
。しかし、これにより、にパラメータを渡す方法の問題が残りますAction
。明らかに、Action
をデリゲートに置き換える必要がありますが、タスクが (ProducerConsumerQueue
クラスの外部から) キューに入れられたときにパラメーターを渡す方法の問題が残ります。これを行う方法についてのアイデアはありますか?
class ProducerConsumerQueue<T>
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action<T>> _itemQ = new Queue<Action<T>>();
public ProducerConsumerQueue(T[] resources)
{
_workers = new Thread[resources.Length];
// Create and start a separate thread for each worker
for (int i = 0; i < resources.Length; i++)
{
Thread thread = new Thread(() => Consume(resources[i]));
thread.SetApartmentState(ApartmentState.STA);
_workers[i] = thread;
_workers[i].Start();
}
}
public void Shutdown(bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem(null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action<T> item)
{
lock (_locker)
{
_itemQ.Enqueue(item); // We must pulse because we're
Monitor.Pulse(_locker); // changing a blocking condition.
}
}
void Consume(T parameter)
{
while (true) // Keep consuming until
{ // told otherwise.
Action<T> item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait(_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(parameter); // Execute item.
}
}
}