背景情報については、次の質問を参照してください。
タスク並列ライブラリのタスクはActivityIDにどのように影響しますか?
その質問は、タスクがTrace.CorrelationManager.ActivityIdにどのように影響するかを尋ねます。@Greg Samsonは、ActivityIdがタスクのコンテキストで信頼できることを示すテストプログラムで自分の質問に答えました。テストプログラムは、タスクデリゲートの最初にActivityIdを設定し、スリープして作業をシミュレートし、最後にActivityIdをチェックして、同じ値であること(つまり、別のスレッドによって変更されていないこと)を確認します。プログラムは正常に実行されます。
スレッド化、タスク、および並列操作のための他の「コンテキスト」オプションを調査しているときに(最終的にはロギングのコンテキストを改善するため)、Trace.CorrelationManager.LogicalOperationStackで奇妙な問題に遭遇しました(とにかく奇妙でした)。以下の彼の質問に対する私の「答え」をコピーしました。
私が遭遇した問題を適切に説明していると思います(Parallel.Forのコンテキストで使用すると、Trace.CorrelationManager.LogicalOperationStackが明らかに破損している(または何か)が、Parallel.For自体が論理演算で囲まれている場合のみ) 。
これが私の質問です:
Trace.CorrelationManager.LogicalOperationStackはParallel.Forで使用できる必要がありますか?もしそうなら、Parallel.Forが開始されたときに論理演算がすでに有効になっている場合、それは違いを生むべきですか?
Parallel.ForでLogicalOperationStackを使用する「正しい」方法はありますか?このサンプルプログラムを別の方法でコーディングして、「機能」させることはできますか?「動作する」とは、LogicalOperationStackには常に予想される数のエントリがあり、エントリ自体が予想されるエントリであることを意味します。
ThreadsとThreadPoolスレッドを使用していくつかの追加のテストを実行しましたが、同様の問題が発生したかどうかを確認するために、戻ってそれらのテストを再試行する必要があります。
Task / ParallelスレッドとThreadPoolスレッドは、親スレッドからTrace.CorrelationManager.ActivityIdとTrace.CorrelationManager.LogicalOperationStackの値を「継承」しているように見えます。これらの値は、(SetDataではなく) CallContextのLogicalSetDataメソッドを使用してCorrelationManagerによって格納されるため、これは予想されます。
繰り返しになりますが、この質問に戻って、以下に投稿した「回答」の元のコンテキストを取得してください。
タスク並列ライブラリのタスクはActivityIDにどのように影響しますか?
MicrosoftのParallelExtensionsフォーラムで、この同様の質問(これまでのところ回答されていません)も参照してください。
[貼り付けを開始]
これは実際にはあなたの質問に対する答えではないので、私の投稿を答えとして許してください。ただし、CorrelationManagerの動作やスレッド/タスクなどを扱っているため、あなたの質問に関連しています。私は、CorrelationManager LogicalOperationStack
(およびStartLogicalOperation/StopLogicalOperation
メソッド)を使用して、マルチスレッドシナリオで追加のコンテキストを提供することを検討してきました。
私はあなたの例を取り上げ、Parallel.Forを使用して並行して作業を実行する機能を追加するために少し変更しました。また、私はStartLogicalOperation/StopLogicalOperation
(内部的に)ブラケットに使用しDoLongRunningWork
ます。概念的にDoLongRunningWork
は、実行されるたびに次のようなことを行います。
DoLongRunningWork
StartLogicalOperation
Thread.Sleep(3000)
StopLogicalOperation
これらの論理演算を(多かれ少なかれそのまま)コードに追加すると、すべての論理演算が同期されたままになることがわかりました(常にスタック上の予想される演算数とスタック上の演算の値は常に次のようになります)期待される)。
私自身のテストのいくつかでは、これが常に当てはまるとは限らないことがわかりました。論理演算スタックが「破損」していました。私が思いついた最も良い説明は、「子」スレッドが終了するときにCallContext情報を「親」スレッドコンテキストに「マージ」すると、「古い」子スレッドコンテキスト情報(論理操作)が「別の兄弟の子スレッドによって継承されました」。
この問題は、Parallel.Forが明らかにメインスレッド(少なくともサンプルコードでは、記述されているとおり)を「ワーカースレッド」(または並列ドメインで呼び出されるもの)の1つとして使用しているという事実に関連している可能性があります。DoLongRunningWorkが実行されるたびに、新しい論理操作が開始(開始時)および停止(終了時)されます(つまり、LogicalOperationStackにプッシュされてポップバックされます)。メインスレッドですでに論理演算が有効になっていて、DoLongRunningWorkがメインスレッドで実行されている場合、新しい論理演算が開始されるため、メインスレッドのLogicalOperationStackには2つの演算があります。DoLongRunningWorkの後続の実行(DoLongRunningWorkのこの「反復」がメインスレッドで実行されている限り)は、(明らかに)メインスレッドを継承します。
私の例では、LogicalOperationStackの動作が、変更したバージョンの例と異なる理由を理解するのに長い時間がかかりました。最後に、私のコードではプログラム全体を論理演算で囲んでいたのに対し、テストプログラムの修正バージョンではそうではなかったことがわかりました。これは、私のテストプログラムでは、「作業」が実行されるたびに(DoLongRunningWorkに類似)、すでに論理演算が有効になっていることを意味します。テストプログラムの修正バージョンでは、プログラム全体を論理演算で囲んでいませんでした。
したがって、プログラム全体を論理演算で囲むようにテストプログラムを変更したとき、およびParallel.Forを使用している場合は、まったく同じ問題が発生しました。
上記の概念モデルを使用すると、これは正常に実行されます。
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
明らかに同期していないLogicalOperationStackが原因で、これは最終的にアサートされますが、次のようになります。
StartLogicalOperation
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
StopLogicalOperation
これが私のサンプルプログラムです。これは、ActivityIdとLogicalOperationStackを操作するDoLongRunningWorkメソッドがあるという点であなたのものと似ています。DoLongRunningWorkのキックには2つのフレーバーがあります。1つのフレーバーはParallel.Forを使用するタスクを使用します。各フレーバーは、並列化された操作全体が論理操作で囲まれるかどうかに関係なく実行することもできます。したがって、並列操作を実行するには、合計4つの方法があります。それぞれを試すには、目的の「Use ...」メソッドのコメントを解除し、再コンパイルして実行します。 UseTasks
、、UseTasks(true)
およびUseParallelFor
すべてが完了するまで実行する必要があります。 UseParallelFor(true)
LogicalOperationStackには予想される数のエントリがないため、ある時点でアサートされます。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CorrelationManagerParallelTest
{
class Program
{
static void Main(string[] args)
{
//UseParallelFor(true) will assert because LogicalOperationStack will not have expected
//number of entries, all others will run to completion.
UseTasks(); //Equivalent to original test program with only the parallelized
//operation bracketed in logical operation.
////UseTasks(true); //Bracket entire UseTasks method in logical operation
////UseParallelFor(); //Equivalent to original test program, but use Parallel.For
//rather than Tasks. Bracket only the parallelized
//operation in logical operation.
////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
private static int mainThreadUsedInDelegate = 0;
// baseCount is the expected number of entries in the LogicalOperationStack
// at the time that DoLongRunningWork starts. If the entire operation is bracketed
// externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise,
// it will be 0.
private static void DoLongRunningWork(int baseCount)
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
mainThreadUsedInDelegate++;
}
}
Guid lo1 = Guid.NewGuid();
Trace.CorrelationManager.StartLogicalOperation(lo1);
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
//This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
//in effect when the Parallel.For operation was started.
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
Trace.CorrelationManager.StopLogicalOperation();
}
private static void UseTasks(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
private static void UseParallelFor(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Parallel.For(0, totalThreads, i =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
});
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
}
}
LogicalOperationStackをParallel.For(および/または他のスレッド/タスク構造)で使用できるかどうか、またはどのように使用できるかというこの問題全体は、おそらく独自の質問に値します。多分私は質問を投稿します。それまでの間、これについて何か考えがあるかどうか疑問に思います(または、ActivityIdは安全であると思われるため、LogicalOperationStackの使用を検討したかどうか疑問に思います)。
[貼り付け終了]
誰かがこの問題について何か考えを持っていますか?