C#TPLを使用していますが、プロデューサー/コンシューマーコードに問題があります...何らかの理由で、TPLはスレッドを再利用せず、停止せずに新しいスレッドを作成し続けます
この動作を示す簡単な例を作成しました。
class Program
{
static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
static CancellationTokenSource m_Cts = new CancellationTokenSource();
static void Producer()
{
try
{
while (!m_Cts.IsCancellationRequested)
{
Console.WriteLine("Enqueuing job");
m_Buffer.Add(0);
Thread.Sleep(1000);
}
}
finally
{
m_Buffer.CompleteAdding();
}
}
static void Consumer()
{
Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
}
static void Run(int i)
{
Console.WriteLine
("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
Thread.CurrentThread.ManagedThreadId,
Process.GetCurrentProcess().Threads.Count);
}
static void Main(string[] args)
{
Task producer = new Task(Producer);
Task consumer = new Task(Consumer);
producer.Start();
consumer.Start();
Console.ReadKey();
m_Cts.Cancel();
Task.WaitAll(producer, consumer);
}
}
このコードは、プロデューサーとコンシューマーの2つのタスクを作成します。Producesは毎秒1つの作業項目を追加し、Consumerは情報を含む文字列のみを出力します。タスクはキューに追加されるよりもはるかに高速に処理されるため、この状況では1つのコンシューマースレッドで十分だと思いますが、実際には、プロセス内のスレッドの2番目の数が1つ増えるということです... TPLはすべてのアイテムに新しいスレッドを作成しています
何が起こっているのかを理解しようとした後、私は別のことに気づきました。BlockingCollectionのサイズは1ですが、しばらくすると、コンシューマーがバーストで呼び出され始めます。たとえば、次のように開始します。
エンキュージョブ
ジョブ処理スレッド:4処理スレッド数:9
エンキュージョブ
ジョブ処理スレッド:6処理スレッド数:9
エンキュージョブ
ジョブ処理スレッド:5プロセススレッド数:10
エンキュージョブ
ジョブ処理スレッド:4処理スレッド数:10
エンキュージョブ
ジョブ処理スレッド:6処理スレッド数:11
これは、1分以内にアイテムを処理する方法です。
エンキュージョブ
ジョブ処理スレッド:25プロセススレッド数:52
エンキュージョブ
エンキュージョブ
ジョブ処理スレッド:5プロセススレッド数:54
ジョブ処理スレッド:5プロセススレッド数:54
Parallel.ForEachループの終了後にスレッドが破棄されるため(この例では示していませんが、実際のプロジェクトにありました)、特にForEachと関係があると思いました...この記事を見つけましたhttp: //reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/、そして私の問題はこのデフォルトのパーティショナーが原因であると思ったので、TPLの例からカスタムパーティショナーを取得しましたこれは、コンシューマスレッドアイテムを1つずつフィードし、実行の順序を修正しましたが(遅延を取り除きました)...
エンキュージョブ
ジョブ処理スレッド:71プロセススレッド数:140
エンキュージョブ
ジョブ処理スレッド:12プロセススレッド数:141
エンキュージョブ
ジョブ処理スレッド:72プロセススレッド数:142
エンキュージョブ
ジョブ処理スレッド:38プロセススレッド数:143
エンキュージョブ
ジョブ処理スレッド:73プロセススレッド数:143
エンキュージョブ
ジョブ処理スレッド:21プロセススレッド数:144
エンキュージョブ
ジョブ処理スレッド:74プロセススレッド数:145
...スレッドの成長を止めませんでした
ParallelOptions.MaxDegreeOfParallelismについては知っていますが、TPLで何が起こっているのか、なぜ理由もなく何百ものスレッドが作成されるのかを理解したいと思います。
私のプロジェクトでは、何時間も実行してデータベースから新しいデータを読み取り、それをBlockingCollectionsに入れ、他のコードでデータを処理する必要があるコードです。約5秒ごとに1つの新しいアイテムがあり、数ミリ秒からほぼ処理に1分かかり、約10分間実行した後、スレッド数が1000スレッドを超えました。