8

C#TPLを使用していますが、プロデューサー/コンシューマーコードに問題があります...何らかの理由で、TPLはスレッドを再利用せず、停止せずに新しいスレッドを作成し続けます

この動作を示す簡単な例を作成しました。

class Program
{
    static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
    static CancellationTokenSource m_Cts = new CancellationTokenSource();

    static void Producer()
    {
        try
        {
            while (!m_Cts.IsCancellationRequested)
            {
                Console.WriteLine("Enqueuing job");
                m_Buffer.Add(0);
                Thread.Sleep(1000);
            }
        }
        finally
        {
            m_Buffer.CompleteAdding();
        }
    }

    static void Consumer()
    {
        Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
    }

    static void Run(int i)
    {
        Console.WriteLine
            ("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
              Thread.CurrentThread.ManagedThreadId, 
              Process.GetCurrentProcess().Threads.Count);
    }

    static void Main(string[] args)
    {
        Task producer = new Task(Producer);
        Task consumer = new Task(Consumer);
        producer.Start();
        consumer.Start();

        Console.ReadKey();
        m_Cts.Cancel();

        Task.WaitAll(producer, consumer);
    }
}

このコードは、プロデューサーとコンシューマーの2つのタスクを作成します。Producesは毎秒1つの作業項目を追加し、Consumerは情報を含む文字列のみを出力します。タスクはキューに追加されるよりもはるかに高速に処理されるため、この状況では1つのコンシューマースレッドで十分だと思いますが、実際には、プロセス内のスレッドの2番目の数が1つ増えるということです... TPLはすべてのアイテムに新しいスレッドを作成しています

何が起こっているのかを理解しようとした後、私は別のことに気づきました。BlockingCollectionのサイズは1ですが、しばらくすると、コンシューマーがバーストで呼び出され始めます。たとえば、次のように開始します。

エンキュージョブ

ジョブ処理スレッド:4処理スレッド数:9

エンキュージョブ

ジョブ処理スレッド:6処理スレッド数:9

エンキュージョブ

ジョブ処理スレッド:5プロセススレッド数:10

エンキュージョブ

ジョブ処理スレッド:4処理スレッド数:10

エンキュージョブ

ジョブ処理スレッド:6処理スレッド数:11

これは、1分以内にアイテムを処理する方法です。

エンキュージョブ

ジョブ処理スレッド:25プロセススレッド数:52

エンキュージョブ

エンキュージョブ

ジョブ処理スレッド:5プロセススレッド数:54

ジョブ処理スレッド:5プロセススレッド数:54

Parallel.ForEachループの終了後にスレッドが破棄されるため(この例では示していませんが、実際のプロジェクトにありました)、特にForEachと関係があると思いました...この記事を見つけましたhttp: //reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/、そして私の問題はこのデフォルトのパーティショナーが原因であると思ったので、TPLの例からカスタムパーティショナーを取得しましたこれは、コンシューマスレッドアイテムを1つずつフィードし、実行の順序を修正しましたが(遅延を取り除きました)...

エンキュージョブ

ジョブ処理スレッド:71プロセススレッド数:140

エンキュージョブ

ジョブ処理スレッド:12プロセススレッド数:141

エンキュージョブ

ジョブ処理スレッド:72プロセススレッド数:142

エンキュージョブ

ジョブ処理スレッド:38プロセススレッド数:143

エンキュージョブ

ジョブ処理スレッド:73プロセススレッド数:143

エンキュージョブ

ジョブ処理スレッド:21プロセススレッド数:144

エンキュージョブ

ジョブ処理スレッド:74プロセススレッド数:145

...スレッドの成長を止めませんでした

ParallelOptions.MaxDegreeOfParallelismについては知っていますが、TPLで何が起こっているのか、なぜ理由もなく何百ものスレッドが作成されるのかを理解したいと思います。

私のプロジェクトでは、何時間も実行してデータベースから新しいデータを読み取り、それをBlockingCollectionsに入れ、他のコードでデータを処理する必要があるコードです。約5秒ごとに1つの新しいアイテムがあり、数ミリ秒からほぼ処理に1分かかり、約10分間実行した後、スレッド数が1000スレッドを超えました。

4

1 に答える 1

6

一緒にこの動作を引き起こす2つのことがあります:

  1. ThreadPool状況に最適な数のスレッドを使用しようとします。ただし、プール内のスレッドの1つがブロックされている場合、プールはそのスレッドが有用な作業を行っていないかのようにこれを認識し、その直後に別のスレッドを作成する傾向があります。これが意味するのは、ブロッキングが多い場合ThreadPool、最適なスレッド数を推測するのが非常に悪く、制限に達するまで新しいスレッドを作成する傾向があるということです。

  2. Parallel.ForEach()ThreadPoolスレッドの最大数を明示的に設定しない限り、はスレッドの正しい数を推測することを信頼します。Parallel.ForEach()また、データのストリームではなく、主に制限されたコレクションを対象としていました。

これら2つをと組み合わせるとGetConsumingEnumerable()Parallel.ForEach()ほとんどの場合ブロックされるスレッドが作成されます。はこれThreadPoolを認識し、CPUの使用率を維持するために、ますます多くのスレッドを作成します。

ここでの正しい解決策は、を設定することMaxDegreeOfParallelismです。計算がCPUにバインドされている場合、最良の値はおそらくEnvironment.ProcessorCountです。それらがIOバウンドである場合は、実験的に最良の値を見つける必要があります。

.Net 4.5を使用できる場合の別のオプションは、TPLデータフローを使用することです。このライブラリは、あなたが持っているようにデータのストリームを処理するために特別に作られたので、あなたのコードが抱えている問題はありません。実際にはそれよりも優れており、現在何も処理していないときはスレッドをまったく使用しません。

注:新しいアイテムごとに新しいスレッドが作成されるのには十分な理由もありますが、それを説明するには、どのようにParallel.ForEach()機能するかをより詳細に説明する必要があり、ここでは必要ないと思います。

于 2012-08-30T08:22:28.193 に答える