32

私は最近、プロデューサー/コンシューマーパターンのc#実装に出くわしました。それはとてもシンプルで(少なくとも私にとっては)とてもエレガントです。

2006年頃に考案されたようですので、この実装は
-安全
-まだ適用可能かどうか疑問に思いました

コードは以下のとおりです(元のコードはhttp://bytes.com/topic/net/answers/575276-producer-consumer#post2251375で参照されていました)

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}
4

5 に答える 5

38

コードはそれよりも古いです-.NET2.0が出る前に私はそれを書きました。プロデューサー/コンシューマーキューの概念はそれよりもはるか古いです:)

はい、そのコードは私が知る限り安全ですが、いくつかの欠点があります。

  • それは一般的ではありません。現代のバージョンは確かに一般的です。
  • キューを停止する方法はありません。キューを停止する(すべてのコンシューマースレッドがリタイアするようにする)簡単な方法の1つは、キューに入れることができる「作業停止」トークンを用意することです。次に、スレッドと同じ数のトークンを追加します。または、停止することを示す別のフラグがあります。(これにより、キュー内の現在のすべての作業を終了する前に、他のスレッドを停止できます。)
  • ジョブが非常に小さい場合、一度に1つのジョブを消費することは、最も効率的な方法ではない場合があります。

正直なところ、コードの背後にある考え方は、コード自体よりも重要です。

于 2009-11-01T08:04:33.070 に答える
32

次のコード スニペットのようなことができます。これは一般的であり、null (または使用したい任意のフラグ) をキューに入れ、ワーカー スレッドに終了するように指示する方法があります。

コードはここから取得されます: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}
于 2009-11-01T08:29:36.730 に答える
23

当時、私は上記のコードとそれが由来する一連の記事から、Monitor.Wait / Pulseがどのように機能するか(そして一般的にスレッドについて多くのこと)を学びました。ジョンが言うように、それはそれに多くの価値があり、確かに安全で適用可能です。

ただし、.NET 4の時点では、フレームワークに生産者/消費者キューの実装があります。自分で見つけただけですが、これまでは必要なことはすべてやってくれます。

于 2012-05-02T14:10:43.763 に答える