11

背景情報については、次の質問を参照してください。

タスク並列ライブラリのタスクはActivityIDにどのように影響しますか?

その質問は、タスクがTrace.CorrelationManager.ActivityIdにどのように影響するかを尋ねます。@Greg Samsonは、ActivityIdがタスクのコンテキストで信頼できることを示すテストプログラムで自分の質問に答えました。テストプログラムは、タスクデリゲートの最初にActivityIdを設定し、スリープして作業をシミュレートし、最後にActivityIdをチェックして、同じ値であること(つまり、別のスレッドによって変更されていないこと)を確認します。プログラムは正常に実行されます。

スレッド化、タスク、および並列操作のための他の「コンテキスト」オプションを調査しているときに(最終的にはロギングのコンテキストを改善するため)、Trace.CorrelationManager.LogicalOperationStackで奇妙な問題に遭遇しました(とにかく奇妙でした)。以下の彼の質問に対する私の「答え」をコピーしました。

私が遭遇した問題を適切に説明していると思います(Parallel.Forのコンテキストで使用すると、Trace.CorrelationManager.LogicalOperationStackが明らかに破損している(または何か)が、Parallel.For自体が論理演算で囲まれている場合のみ) 。

これが私の質問です:

  1. Trace.CorrelationManager.LogicalOperationStackはParallel.Forで使用できる必要がありますか?もしそうなら、Parallel.Forが開始されたときに論理演算がすでに有効になっている場合、それは違いを生むべきですか?

  2. Parallel.ForでLogicalOperationStackを使用する「正しい」方法はありますか?このサンプルプログラムを別の方法でコーディングして、「機能」させることはできますか?「動作する」とは、LogicalOperationStackには常に予想される数のエントリがあり、エントリ自体が予想されるエントリであることを意味します。

ThreadsとThreadPoolスレッドを使用していくつかの追加のテストを実行しましたが、同様の問題が発生したかどうかを確認するために、戻ってそれらのテストを再試行する必要があります。

Task / ParallelスレッドとThreadPoolスレッドは、親スレッドからTrace.CorrelationManager.ActivityIdとTrace.CorrelationManager.LogicalOperationStackの値を「継承」しているように見えます。これらの値は、(SetDataではなく) CallContextのLogicalSetDataメソッドを使用してCorrelationManagerによって格納されるため、これは予想されます。

繰り返しになりますが、この質問に戻って、以下に投稿した「回答」の元のコンテキストを取得してください。

タスク並列ライブラリのタスクはActivityIDにどのように影響しますか?

MicrosoftのParallelExtensionsフォーラムで、この同様の質問(これまでのところ回答されていません)も参照してください。

http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

[貼り付けを開始]

これは実際にはあなたの質問に対する答えではないので、私の投稿を答えとして許してください。ただし、CorrelationManagerの動作やスレッド/タスクなどを扱っているため、あなたの質問に関連しています。私は、CorrelationManager LogicalOperationStack(およびStartLogicalOperation/StopLogicalOperationメソッド)を使用して、マルチスレッドシナリオで追加のコンテキストを提供することを検討してきました。

私はあなたの例を取り上げ、Parallel.Forを使用して並行して作業を実行する機能を追加するために少し変更しました。また、私はStartLogicalOperation/StopLogicalOperation(内部的に)ブラケットに使用しDoLongRunningWorkます。概念的にDoLongRunningWorkは、実行されるたびに次のようなことを行います。

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

これらの論理演算を(多かれ少なかれそのまま)コードに追加すると、すべての論理演算が同期されたままになることがわかりました(常にスタック上の予想される演算数とスタック上の演算の値は常に次のようになります)期待される)。

私自身のテストのいくつかでは、これが常に当てはまるとは限らないことがわかりました。論理演算スタックが「破損」していました。私が思いついた最も良い説明は、「子」スレッドが終了するときにCallContext情報を「親」スレッドコンテキストに「マージ」すると、「古い」子スレッドコンテキスト情報(論理操作)が「別の兄弟の子スレッドによって継承されました」。

この問題は、Parallel.Forが明らかにメインスレッド(少なくともサンプルコードでは、記述されているとおり)を「ワーカースレッド」(または並列ドメインで呼び出されるもの)の1つとして使用しているという事実に関連している可能性があります。DoLongRunningWorkが実行されるたびに、新しい論理操作が開始(開始時)および停止(終了時)されます(つまり、LogicalOperationStackにプッシュされてポップバックされます)。メインスレッドですでに論理演算が有効になっていて、DoLongRunningWorkがメインスレッドで実行されている場合、新しい論理演算が開始されるため、メインスレッドのLogicalOperationStackには2つの演算があります。DoLongRunningWorkの後続の実行(DoLongRunningWorkのこの「反復」がメインスレッドで実行されている限り)は、(明らかに)メインスレッドを継承します。

私の例では、LogicalOperationStackの動作が、変更したバージョンの例と異なる理由を理解するのに長い時間がかかりました。最後に、私のコードではプログラム全体を論理演算で囲んでいたのに対し、テストプログラムの修正バージョンではそうではなかったことがわかりました。これは、私のテストプログラムでは、「作業」が実行されるたびに(DoLongRunningWorkに類似)、すでに論理演算が有効になっていることを意味します。テストプログラムの修正バージョンでは、プログラム全体を論理演算で囲んでいませんでした。

したがって、プログラム全体を論理演算で囲むようにテストプログラムを変更したとき、およびParallel.Forを使用している場合は、まったく同じ問題が発生しました。

上記の概念モデルを使用すると、これは正常に実行されます。

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

明らかに同期していないLogicalOperationStackが原因で、これは最終的にアサートされますが、次のようになります。

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

これが私のサンプルプログラムです。これは、ActivityIdとLogicalOperationStackを操作するDoLongRunningWorkメソッドがあるという点であなたのものと似ています。DoLongRunningWorkのキックには2つのフレーバーがあります。1つのフレーバーはParallel.Forを使用するタスクを使用します。各フレーバーは、並列化された操作全体が論理操作で囲まれるかどうかに関係なく実行することもできます。したがって、並列操作を実行するには、合計4つの方法があります。それぞれを試すには、目的の「Use ...」メソッドのコメントを解除し、再コンパイルして実行します。 UseTasks、、UseTasks(true)およびUseParallelForすべてが完了するまで実行する必要があります。 UseParallelFor(true)LogicalOperationStackには予想される数のエントリがないため、ある時点でアサートされます。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

LogicalOperationStackをParallel.For(および/または他のスレッド/タスク構造)で使用できるかどうか、またはどのように使用できるかというこの問題全体は、おそらく独自の質問に値します。多分私は質問を投稿します。それまでの間、これについて何か考えがあるかどうか疑問に思います(または、ActivityIdは安全であると思われるため、LogicalOperationStackの使用を検討したかどうか疑問に思います)。

[貼り付け終了]

誰かがこの問題について何か考えを持っていますか?

4

2 に答える 2

6

[更新を開始]

また、 MicrosoftのParallel Extensions for .Netサポートフォーラムでこの質問をし、最終的にStephenToubから回答を受け取りました。論理操作スタックが破損する原因となっているLogicalCallContextにバグがあることが判明しました。また、Parallel.Forのタスクの実行に関する作業と、Parallel.Forがバグの影響を受けやすい理由について簡単に説明した、すばらしい説明(Stephenによる彼の回答に対する回答へのフォローアップ)もあります。

以下の私の答えでは、Parallel.Forはメインスレッドを「ワーカー」スレッドの1つとして使用するため、LogicalOperationStackはParallel.Forと互換性がないと推測しています。スティーブンの説明に基づいて、私の推測は間違っていました。Parallel.Forは、メインスレッドを「ワーカー」スレッドの1つとして使用しますが、単に「そのまま」使用されるわけではありません。最初のタスクはメインスレッドで実行されますが、新しいスレッドで実行されているかのように実行されます。詳細については、スティーブンの説明をお読みください。

【更新終了】

私の知る限り、答えは次のとおりです。

ActivityIdとLogicalOperationStackはどちらも、CallContext.LogicalSetDataを介して保存されます。つまり、これらの値はすべての「子」スレッドに「フロー」されます。たとえば、エントリポイントのActivityIdをマルチスレッドサーバー(サービス呼び出しなど)に設定すると、そのエントリポイントから最終的に開始されるすべてのスレッドを同じ「アクティビティ」の一部にすることができるので、これは非常に便利です。同様に、(LogicalOperationStackを介した)論理操作も子スレッドに流れます。

Trace.CorrelationManager.ActivityIdに関して:

ActivityIdは、私がテストしたすべてのスレッドモデルと互換性があるようです。スレッドを直接使用する、ThreadPoolを使用する、Tasksを使用する、Parallelを使用する。*。すべての場合において、ActivityIdには期待値があります。

Trace.CorrelationManager.LogicalOperationStackに関して:

LogicalOperationStackは、ほとんどのスレッドモデルと互換性があるようですが、Parallel。*とは互換性がありません。スレッドを直接使用して、ThreadPoolとTasksを使用して、LogicalOperationStack(私の質問で提供されているサンプルコードで操作されている)はその整合性を維持します。常にLogicalOperationStackの内容は期待どおりです。

LogicalOperationStackはParallel.Forと互換性がありません。論理演算が「有効」である場合、つまり、Parallel。*演算を開始する前に、CorrelationManager.StartLogicalOperationを呼び出してから、Paralle。*のコンテキストで(つまり、デリゲートで)新しい論理演算を開始した場合。の場合、LogicalOperationStackは破損します。(おそらく破損する可能性があります。Parallel。*は追加のスレッドを作成しない可能性があります。これは、LogicalOperationStackが安全であることを意味します)。

この問題は、Parallel。*がメインスレッド(または、より正確には、並列操作を開始するスレッド)を「ワーカー」スレッドの1つとして使用するという事実に起因します。つまり、「論理操作」は「メイン」スレッドと同じ「ワーカー」スレッドで開始および停止されるため、「メイン」スレッドのLogicalOperationStackが変更されます。呼び出し元のコード(つまりデリゲート)がスタックを正しく維持している場合でも(各StartLogicalOperationが対応するStopLogicalOperationで「停止」されていることを確認)、「メイン」スレッドスタックが変更されます。最終的には(とにかく)、「メイン」スレッドのLogicalOperationStackは、基本的に2つの異なる「論理」スレッドによって変更されているように見えます。「メイン」

なぜこれが機能しないのか(少なくとも私が期待するように)の詳細はわかりません。私の推測では、デリゲートがスレッド(メインスレッドと同じではない)で実行されるたびに、スレッドはメインスレッドのLogicalOperationStackの現在の状態を「継承」します。デリゲートが現在メインスレッドで実行されており(ワーカースレッドとして再利用されている)、論理操作を開始した場合、他の並列化されたデリゲートの1つ(または複数)が、現在のメインスレッドのLogicalOperationStackを「継承」します。 1つ(または複数)の新しい論理演算が有効になっています!

FWIW、私は(主にテストのために、現時点では実際には使用していません)、LogicalOperationStackを模倣するために次の「論理スタック」を実装しましたが、Parallelで動作するように実行します。*お気軽に試してくださいそれを出したり、使用したりします。テストするには、

Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation

私の元の質問からのサンプルコードで

LogicalOperation.OperationStack.Push()/Pop().


//OperationStack.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

using System.Runtime.Remoting.Messaging;

namespace LogicalOperation
{
  public static class OperationStack
  {
    private const string OperationStackSlot = "OperationStackSlot";

    public static IDisposable Push(string operation)
    {
      OperationStackItem parent = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      OperationStackItem op = new OperationStackItem(parent, operation);
      CallContext.LogicalSetData(OperationStackSlot, op);
      return op;
    }

    public static object Pop()
    {
      OperationStackItem current = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;

      if (current != null)
      {
        CallContext.LogicalSetData(OperationStackSlot, current.Parent);
        return current.Operation;
      }
      else
      {
        CallContext.FreeNamedDataSlot(OperationStackSlot);
      }
      return null;
    }

    public static object Peek()
    {
      OperationStackItem top = Top();
      return top != null ? top.Operation : null;
    }

    internal static OperationStackItem Top()
    {
      OperationStackItem top = CallContext.LogicalGetData(OperationStackSlot) as OperationStackItem;
      return top;
    }

    public static IEnumerable<object> Operations()
    {
      OperationStackItem current = Top();
      while (current != null)
      {
        yield return current.Operation;
        current = current.Parent;
      }
    }

    public static int Count
    {
      get
      {
        OperationStackItem top = Top();
        return top == null ? 0 : top.Depth;
      }
    }

    public static IEnumerable<string> OperationStrings()
    {
      foreach (object o in Operations())
      {
        yield return o.ToString();
      }
    }
  }
}


//OperationStackItem.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace LogicalOperation
{
  public class OperationStackItem : IDisposable
  {
    private OperationStackItem parent = null;
    private object operation;
    private int depth;
    private bool disposed = false;

    internal OperationStackItem(OperationStackItem parentOperation, object operation)
    {
      parent = parentOperation;
      this.operation = operation;
      depth = parent == null ? 1 : parent.Depth + 1;
    }

    internal object Operation { get { return operation; } }
    internal int Depth { get { return depth; } }

    internal OperationStackItem Parent { get { return parent; } }

    public override string ToString()
    {
      return operation != null ? operation.ToString() : "";
    }

    #region IDisposable Members

    public void Dispose()
    {
      if (disposed) return;

      OperationStack.Pop();

      disposed = true;
    }

    #endregion
  }
}

これは、Brent VanderMeideがここで説明しているスコープオブジェクトに触発されました:http ://www.dnrtv.com/default.aspx?showNum = 114

このクラスは次のように使用できます。

public void MyFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFunc"))
  {
    MyOtherFunc();
  }
}

public void MyOtherFunc()
{
  using (LogicalOperation.OperationStack.Push("MyOtherFunc"))
  {
    MyFinalFunc();
  }
}

public void MyFinalFunc()
{
  using (LogicalOperation.OperationStack.Push("MyFinalFunc"))
  {
    Console.WriteLine("Hello");
  }
}
于 2011-02-07T19:56:00.350 に答える
2

TPLを多用するアプリケーションで簡単に機能する論理スタックを作成する方法を調査していました。既存のコードを変更せずに必要なすべてのことを実行できるため、LogicalOperationStackを使用することにしました。しかし、次に、LogicalCallContextのバグについて読みました。

https://connect.microsoft.com/VisualStudio/feedback/details/609929/logicalcallcontext-clone-bug-when-correlationmanager-slot-is-present

だから私はこのバグの回避策を見つけようとしました、そして私はそれがTPL(ありがとうILSpy)のために働くようになったと思います:

public static class FixLogicalOperationStackBug
{
    private static bool _fixed = false;

    public static void Fix()
    {
        if (!_fixed)
        {
            _fixed = true;

            Type taskType = typeof(Task);
            var s_ecCallbackField = taskType.GetFields(BindingFlags.Static | BindingFlags.NonPublic).First(f => f.Name == "s_ecCallback");
            ContextCallback s_ecCallback = (ContextCallback)s_ecCallbackField.GetValue(null);

            ContextCallback injectedCallback = new ContextCallback(obj =>
            {
                // Next line will set the private field m_IsCorrelationMgr of LogicalCallContext which isn't cloned
                CallContext.LogicalSetData("System.Diagnostics.Trace.CorrelationManagerSlot", Trace.CorrelationManager.LogicalOperationStack);
                s_ecCallback(obj);
            });

            s_ecCallbackField.SetValue(null, injectedCallback);
        }
    }
}
于 2012-02-19T22:49:55.317 に答える