1

リソースのセットを使用してプロデューサー/コンシューマー パターンを実装しようとしているため、各スレッドには 1 つのリソースが関連付けられています。たとえば、各タスクがStreamWriterその結果を書き込む必要があるタスクのキューがあるとします。各タスクには、パラメーターが渡される必要もあります。

私は Joseph Albahari の実装から始めました (私の修正版については以下を参照してください)。

ActionのキューをリソースのキューAction<T>に置き換え、Tスレッドに関連付けられたリソースを に渡しますAction。しかし、これにより、にパラメータを渡す方法の問題が残りますAction。明らかに、Actionをデリゲートに置き換える必要がありますが、タスクが (ProducerConsumerQueueクラスの外部から) キューに入れられたときにパラメーターを渡す方法の問題が残ります。これを行う方法についてのアイデアはありますか?

class ProducerConsumerQueue<T>
    {
        readonly object _locker = new object();            
        Thread[] _workers;
        Queue<Action<T>> _itemQ = new Queue<Action<T>>();

        public ProducerConsumerQueue(T[] resources)
        {
            _workers = new Thread[resources.Length];

            // Create and start a separate thread for each worker
            for (int i = 0; i < resources.Length; i++)
            {
                Thread thread = new Thread(() => Consume(resources[i]));
                thread.SetApartmentState(ApartmentState.STA);
                _workers[i] = thread;
                _workers[i].Start();
            }
        }        

        public void Shutdown(bool waitForWorkers)
        {
            // Enqueue one null item per worker to make each exit.
            foreach (Thread worker in _workers)
                EnqueueItem(null);

            // Wait for workers to finish
            if (waitForWorkers)
                foreach (Thread worker in _workers)
                    worker.Join();
        }

        public void EnqueueItem(Action<T> item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);           // We must pulse because we're
                Monitor.Pulse(_locker);         // changing a blocking condition.
            }
        }

        void Consume(T parameter)
        {
            while (true)                        // Keep consuming until
            {                                   // told otherwise.
                Action<T> item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0) Monitor.Wait(_locker);
                    item = _itemQ.Dequeue();
                }
                if (item == null) return;         // This signals our exit.
                item(parameter);                           // Execute item.
            }
        }
    }
4

1 に答える 1

3

の型TProducerConsumerQueue<T>リソースである必要はありません。リソースを含む複合型にすることができます。.NET4 でこれを行う最も簡単な方法は、Tuple<StreamWriter, YourParameterType>. 生産/消費者キューは食べて吐き出すだけなTので、Action<T>プロパティを使用してリソースとパラメーターを取得できます。Tupleを使用している場合はItem1、リソースのItem2取得とパラメーターの取得に使用します。

.NET4 を使用していない場合、プロセスは似ていますが、独自のクラスを作成するだけです。

public class WorkItem<T>
{
    private StreamWriter resource;
    private T parameter;

    public WorkItem(StreamWriter resource, T parameter)
    {
        this.resource = resource;
        this.parameter = parameter;
    }

    public StreamWriter Resource { get { return resource; } }
    public T Parameter { get { return parameter; } }
}

実際、それを汎用にすることは、状況に対して過度に設計されている可能性があります。T を希望するタイプに定義するだけです。

また、参考までに、同時キューや並列タスク ライブラリなどのユース ケースに適用できる .NET4 に含まれるマルチスレッドを実行する新しい方法があります。また、セマフォなどの従来のアプローチと組み合わせることもできます。

編集:

このアプローチを続けて、使用方法を示す小さなサンプル クラスを次に示します。

  • 限られたリソースへのアクセスを制御するセマフォ
  • スレッド間でそのリソースを安全に管理するための同時キュー
  • タスク並列ライブラリを使用したタスク管理

Processorクラスは次のとおりです。

public class Processor
{
    private const int count = 3;
    private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>();
    private Semaphore semaphore = new Semaphore(count, count);

    public Processor()
    {
        // Populate the resource queue.
        for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i));
    }

    public void Process(int parameter)
    {
        // Wait for one of our resources to become free.
        semaphore.WaitOne();
        StreamWriter resource;
        queue.TryDequeue(out resource);

        // Dispatch the work to a task.
        Task.Factory.StartNew(() => Process(resource, parameter));
    }

    private Random random = new Random();

    private void Process(StreamWriter resource, int parameter)
    {
        // Do work in background with resource.
        Thread.Sleep(random.Next(10) * 100);
        resource.WriteLine("Parameter = {0}", parameter);
        queue.Enqueue(resource);
        semaphore.Release();
    }
}

これで、次のようにクラスを使用できます。

var processor = new Processor();
for (int i = 0; i < 10; i++)
    processor.Process(i);

同時に 3 つまでのタスクがスケジュールされ、それぞれStreamWriterがリサイクルされる独自のリソースを持ちます。

于 2011-05-14T18:54:23.780 に答える