0

単一のソースがデータを入力する一種のキューが必要であり、反対側には、キューが空でないことを検出すると、停止するまでデータの実行を開始するのを待っている消費者がいます。ただし、キューが空になった場合でもキューを監視し続け、さらにデータがポップインされた場合にそれを消費できるようにすることが重要です。コンシューマーがプロデューサーにネストされているため、複数のコンシューマーと複数のプロデューサーによって見つけたもの。したがって、直列ではなく、消費者と生産者の両方が並行して実行されます。

コンシューマとプロデューサを並行して実行します

Parallel.Invoke(() => producer(), () => consumers());

そのような問題は、時々空のキューの内容を並行して実行する方法です

4

1 に答える 1

0

これは、 を使用して比較的簡単に解決できますBlockingCollection<T>

1 つをキューとして使用し、それへの参照をproducer()および のそれぞれに渡すことができますconsumers()

GetConsumingEnumerable()各コンシューマー スレッドから呼び出し、 foreach.

プロデューサー スレッドはアイテムをコレクションに追加し、アイテムのCompleteAdding()生成が完了すると呼び出します。これにより、すべてのコンシューマ スレッドが自動的に foreach ループを終了します。

以下は基本的な例です (エラー処理なし)。への呼び出しThread.Sleep()は負荷をシミュレートするためのものであり、実際のコードでは使用しないでください。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code.
            var plant = new ProcessingPlant();
            plant.Process();
            Console.WriteLine("Work complete.");
        }
    }

    public sealed class ProcessingPlant
    {
        private readonly BlockingCollection<string> _queue = new BlockingCollection<string>();

        public void Process()
        {
            Parallel.Invoke(producer, consumers);
        }

        private void producer()
        {
            for (int i = 0; i < 100; ++i)
            {
                string item = i.ToString();
                Console.WriteLine("Producer is queueing {0}", item);
               _queue.Add(item);  // <- Here's where we add an item to the queue.
                Thread.Sleep(0);
            }

            _queue.CompleteAdding(); // <- Here's where we make all the consumers
        }                            //    exit their foreach loops.

        private void consumers()
        {
            Parallel.Invoke(
                () => consumer(1),
                () => consumer(2),
                () => consumer(3),
                () => consumer(4),
                () => consumer(5)
            );
        }

        private void consumer(int id)
        {
            Console.WriteLine("Consumer {0} is starting.", id);

            foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items.
            {
                Console.WriteLine("Consumer {0} read {1}", id, item);
                Thread.Sleep(0);
            }

            Console.WriteLine("Consumer {0} is stopping.", id);
        }
    }
}

(コンシューマーを開始するためだけに余分なスレッドを使用していることはわかっていますが、BlockingCollection の使用方法を示すという本当のポイントが不明瞭になるのを避けるために、このようにしました。)

于 2013-03-30T00:57:29.437 に答える