15

プログラムでParallel.ForEachを使用しているときに、一部のスレッドが終了していないように見えることがわかりました。実際、それは新しいスレッドを何度も生成し続けました。これは、私が予期していなかった動作であり、絶対に望まない動作です。

私の「実際の」プログラムと同じように、プロセッサとメモリの両方を多く使用する次のコード(.NET 4.0コード)を使用して、この動作を再現することができました。

public class Node
{
    public Node Previous { get; private set; }

    public Node(Node previous)
    {
        Previous = previous;
    }
}

public class Program
{
    public static void Main(string[] args)
    {
        DateTime startMoment = DateTime.Now;
        int concurrentThreads = 0;

        var jobs = Enumerable.Range(0, 2000);
        Parallel.ForEach(jobs, delegate(int jobNr)
        {
            Interlocked.Increment(ref concurrentThreads);

            int heavyness = jobNr % 9;

            //Give the processor and the garbage collector something to do...
            List<Node> nodes = new List<Node>();
            Node current = null;
            for (int y = 0; y < 1024 * 1024 * heavyness; y++)
            {
                current = new Node(current);
                nodes.Add(current);
            }

            TimeSpan elapsed = DateTime.Now - startMoment;
            int threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
            Console.WriteLine("[{0:mm\\:ss}] Job {1,4} complete. {2} threads remaining.", elapsed, jobNr, threadsRemaining);
        });
    }
}

私のクアッドコアで実行すると、予想どおり、最初は4つの同時スレッドで開始されます。ただし、時間の経過とともに、ますます多くのスレッドが作成されています。最終的に、このプログラムはOutOfMemoryExceptionをスローします。

[00:00] Job    0 complete. 3 threads remaining.
[00:01] Job    1 complete. 4 threads remaining.
[00:01] Job    2 complete. 4 threads remaining.
[00:02] Job    3 complete. 4 threads remaining.
[00:05] Job    9 complete. 5 threads remaining.
[00:05] Job    4 complete. 5 threads remaining.
[00:05] Job    5 complete. 5 threads remaining.
[00:05] Job   10 complete. 5 threads remaining.
[00:08] Job   11 complete. 5 threads remaining.
[00:08] Job    6 complete. 5 threads remaining.
...
[00:55] Job   67 complete. 7 threads remaining.
[00:56] Job   81 complete. 8 threads remaining.
...
[01:54] Job  107 complete. 11 threads remaining.
[02:00] Job  121 complete. 12 threads remaining.
..
[02:55] Job  115 complete. 19 threads remaining.
[03:02] Job  166 complete. 21 threads remaining.
...
[03:41] Job  113 complete. 28 threads remaining.
<OutOfMemoryException>

上記の実験のメモリ使用量グラフは次のとおりです。

プロセッサとメモリの使用量

スクリーンショットはオランダ語です。上の部分はプロセッサの使用量、下の部分はメモリの使用量を表しています。)ご覧のとおり、ガベージコレクタが邪魔になるたびに新しいスレッドが生成されているようです(ご覧のとおり) 。メモリ使用量の低下)。

なぜこれが起こっているのか、そして私がそれについて何ができるのかを誰かが説明できますか?.NETで新しいスレッドの生成を停止し、既存のスレッドを最初に終了させたいだけです...

4

3 に答える 3

17

プロパティセットを使用してParallelOptionsインスタンスを指定することにより、作成されるスレッドの最大数を制限できます。MaxDegreeOfParallelism

var jobs = Enumerable.Range(0, 2000);
ParallelOptions po = new ParallelOptions
{ 
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(jobs, po, jobNr =>
{
    // ...
});

観察している動作が得られる理由について:TPL (PLINQの基礎となる)は、デフォルトで、使用するスレッドの最適な数を自由に推測できます。並列タスクがブロックされるたびに、タスクスケジューラは進行状況を維持するために新しいスレッドを作成する場合があります。あなたの場合、ブロッキングは暗黙的に発生している可能性があります。たとえば、Console.WriteLine呼び出しを通じて、または(観察したように)ガベージコレクション中に。

タスク並列ライブラリを使用した同時実行レベルの調整から(使用するスレッドの数は?)

TPLのデフォルトポリシーはプロセッサごとに1つのスレッドを使用することであるため、TPLは最初、タスクのワークロードが最大100%稼働し、0%待機していると想定し、初期想定が失敗してタスクが待機状態になると結論付けることができます(つまり、ブロックを開始します)-TPLは、必要に応じてスレッドを追加する自由を取ります。

于 2012-12-26T10:16:56.930 に答える
7

おそらく、タスクスケジューラがどのように機能するかについて少し読む必要があります。

http://msdn.microsoft.com/en-us/library/ff963549.aspx(ページの後半)

「.NETスレッドプールは、プール内のワーカースレッドの数を自動的に管理します。組み込みのヒューリスティックに従ってスレッドを追加および削除します。.NETスレッドプールには、スレッドを挿入するための2つの主要なメカニズムがあります。ワーカーを追加する飢餓回避メカニズムです。キューに入れられたアイテムで進行が見られない場合はスレッド、および可能な限り少ないスレッドを使用しながらスループットを最大化しようとするヒルクライミングヒューリスティック。

飢餓回避の目標は、デッドロックを防ぐことです。この種のデッドロックは、ワーカースレッドが同期イベントを待機しているときに発生する可能性があります。同期イベントは、スレッドプールのグローバルキューまたはローカルキューでまだ​​保留中のワークアイテムによってのみ満たされます。ワーカースレッドの数が固定されていて、それらのスレッドがすべて同様にブロックされている場合、システムはそれ以上進行できなくなります。新しいワーカースレッドを追加すると、問題が解決します。

山登りヒューリスティックの目標は、スレッドがI / Oまたはプロセッサを停止させるその他の待機条件によってブロックされた場合に、コアの使用率を向上させることです。デフォルトでは、管理対象スレッドプールにはコアごとに1つのワーカースレッドがあります。これらのワーカースレッドの1つがブロックされると、コンピューターの全体的なワークロードによっては、コアが十分に活用されていない可能性があります。スレッドインジェクションロジックは、ブロックされたスレッドと、プロセッサを集中的に使用する長時間の操作を実行しているスレッドを区別しません。したがって、スレッドプールのグローバルキューまたはローカルキューに保留中の作業項目が含まれている場合は常に、実行に長い時間(0.5秒以上)かかるアクティブな作業項目が新しいスレッドプールワーカースレッドの作成をトリガーする可能性があります。」

タスクをLongRunningとしてマークすることはできますが、これには、スレッドプールの外部からタスクにスレッドを割り当てるという副作用があります。これは、タスクをインライン化できないことを意味します。

ParallelForは、与えられた作業をブロックとして扱うため、1つのループ内の作業がかなり小さい場合でも、ルックによって呼び出されたタスクによって実行される全体的な作業がスケジューラーに長く表示される場合があることに注意してください

GCへのほとんどの呼び出しとそれら自体はブロックされていません(別のスレッドで実行されます)が、GCが完了するのを待つと、ブロックされます。また、GCがメモリを再配置しているため、GCの実行中にメモリを割り当てようとすると、いくつかの副作用(およびブロック)が発生する可能性があることにも注意してください。ここには詳細はありませんが、この理由から、PPLには特に同時メモリ管理用のメモリ割り当て機能がいくつかあることは知っています。

コードの出力を見ると、何秒も実行されているようです。ですから、スレッドインジェクションが表示されているのは驚きではありません。ただし、デフォルトのスレッドプールサイズは約30スレッドであることを覚えているようです(おそらくシステムのコア数によって異なります)。コードが割り当てられる前にスレッドが約MBのメモリを消費するため、ここでメモリ不足の例外が発生する理由がわかりません。

于 2013-01-16T20:41:29.820 に答える
1

フォローアップの質問「.NETアプリケーションの同時スレッド数をカウントするにはどうすればよいですか?」を投稿しました。

スレッドを直接カウントする場合、Parallel.For()内のスレッドの数は、ほとんど((非常にまれに、わずかに減少します)増加するだけで、ループの完了後に解放されません。

リリースモードとデバッグモードの両方でこれをチェックしました。

ParallelOptions po = new ParallelOptions
{
  MaxDegreeOfParallelism = Environment.ProcessorCount
};

となし

数字は異なりますが、結論は同じです。

誰かが遊んでみたいと思ったら、これが私が使っていた準備ができたコードです:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Edit4Posting
{
public class Node
{

  public Node Previous { get; private set; }
  public Node(Node previous)
  {
    Previous = previous;
    }
  }
  public class Edit4Posting
  {

    public static void Main(string[] args)
    {
      int concurrentThreads = 0;
      int directThreadsCount = 0;
      int diagThreadCount = 0;

      var jobs = Enumerable.Range(0, 160);
      ParallelOptions po = new ParallelOptions
      {
        MaxDegreeOfParallelism = Environment.ProcessorCount
      };
      Parallel.ForEach(jobs, po, delegate(int jobNr)
      //Parallel.ForEach(jobs, delegate(int jobNr)
      {
        int threadsRemaining = Interlocked.Increment(ref concurrentThreads);

        int heavyness = jobNr % 9;

        //Give the processor and the garbage collector something to do...
        List<Node> nodes = new List<Node>();
        Node current = null;
        //for (int y = 0; y < 1024 * 1024 * heavyness; y++)
        for (int y = 0; y < 1024 * 24 * heavyness; y++)
        {
          current = new Node(current);
          nodes.Add(current);
        }
        //*******************************
        directThreadsCount = Process.GetCurrentProcess().Threads.Count;
        //*******************************
        threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
        Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}",
          jobNr, threadsRemaining, directThreadsCount);
      });
      Console.WriteLine("FINISHED");
      Console.ReadLine();
    }
  }
}
于 2013-03-15T06:45:54.867 に答える