4

スティーブン・トーブの本の88ページ

http://www.microsoft.com/download/en/details.aspx?id=19222

コードがあります

private BlockingCollection<T> _streamingData = new BlockingCollection<T>();
// Parallel.ForEach
Parallel.ForEach(_streamingData.GetConsumingEnumerable(),
item => Process(item));
// PLINQ
var q = from item in _streamingData.GetConsumingEnumerable().AsParallel()
...
select item;

スティーブンはそれから言及します

「GetConsumingEnumerableをデータソースとして呼び出した結果をParallel.ForEachに渡す場合、コレクションが空になると、ループで使用されるスレッドがブロックされる可能性があります。ブロックされたスレッドは、Parallel.ForEachによって解放されない場合があります。このように、上記のコードでは、コレクションが空である期間がある場合、プロセスのスレッド数が着実に増加する可能性があります。」

なぜスレッド数が増えるのかわかりませんか?

コレクションが空の場合、blockingcollectionはそれ以上のスレッドを要求しませんか?

したがって、BlockingCollectionで使用されるスレッドの数を制限するためにWithDegreeOfParallelismを実行する必要はありません。

4

1 に答える 1

3

スレッドプールには、適切なスレッド数を推定するために使用する山登りアルゴリズムがあります。スレッドを追加するとスループットが向上する限り、スレッドプールはより多くのスレッドを作成します。何らかのブロッキングまたはIOが発生したと想定し、システム内のプロセッサの数を超えてCPUを飽和させようとします。

そのため、スレッドプールスレッドでIOを実行したり、ブロックしたりするのは危険な場合があります。

上記の動作の完全に機能する例を次に示します。

        BlockingCollection<string> _streamingData = new BlockingCollection<string>();

        Task.Factory.StartNew(() =>
            {
                for (int i = 0; i < 100; i++)
                {
                    _streamingData.Add(i.ToString());
                    Thread.Sleep(100);
                }
            });

        new Thread(() =>
            {
                while (true)
                {
                    Thread.Sleep(1000);
                    Console.WriteLine("Thread count: " + Process.GetCurrentProcess().Threads.Count);
                }
            }).Start();

        Parallel.ForEach(_streamingData.GetConsumingEnumerable(), item =>
            {
            });

スループットは向上しませんが、スレッド数が増え続ける理由はわかりません。私が説明したモデルによると、それは成長しません。しかし、私のモデルが実際に正しいかどうかはわかりません。

スレッドプールには、進行状況がまったく見られない場合にスレッドを生成するヒューリスティックが追加されている可能性があります(1秒あたりに完了したタスクで測定)。これは、アプリケーションでの多くのデッドロックを防ぐ可能性が高いため、理にかなっています。重要なタスクが既存のタスクが終了してスレッドを使用可能にするのを待っているために実行できない場合、デッドロックが発生する可能性があります。これは、スレッドプールに関するよく知られた問題です。

于 2012-01-28T16:19:35.520 に答える