1

これを、機能するいくつかのことをどのように理解するかについての基本的な説明から始め、tldr ですべてを締めくくります。人々が単に私がここで抱えている実際の質問に到達したい場合. ここでの私の理解が間違っている場合は、訂正してください。

TPL は Task Parallel Library の略で、開発者が使いやすいようにスレッド化をさらに簡素化しようとする .NET 4.0 の答えです。慣れていない場合は (非常に基本的なレベルで)、新しい Task オブジェクトを開始し、それにデリゲートを渡します。デリゲートは、スレッド プールから取得したバックグラウンド スレッドで実行されます (真にタスクを作成するのではなく、スレッド プールを使用することにより)。新しいスレッドを作成して破棄する代わりに、これらの既存のスレッドを使用することで、時間とリソースを節約できます)。

私が理解していることから、C# の Parallel.ForEach コマンドは、実行することになっているデリゲートごとに (おそらくスレッド プールから) 新しいスレッドを生成します。コンパイラーが、より効率的になるのに十分な速さで反復が行われると判断した場合。

私の目標に関する最も関連性の高い背景情報:

タスクから開始して、残りのプログラムと同時に実行する簡単なプログラムを作成しようとしています。このタスクでは、Parallel.ForEach が 3 回の「反復」で実行されます。合計で、プログラムは合計 5 つのスレッド (最大) を実行すると予想されます。メイン スレッドに 1 つ、実際のタスクに 1 つ、Parallel.ForEach に最大 3 つです。各スレッドには、達成する独自の目標があります (ただし、Parallel.ForEach はすべて、計算する関連する itemNumber に異なる値を持つ同じ目標を持っています。メイン スレッドがすべての目標を完了すると、Task.Wait() を使用して待機します)。 Parallel.ForEach も終了するのを待つ終了タスクで、値が使用され、検証されます。

tldr; 実際の質問:

前述のアイデアが実行されると、Parallel.ForEach は、予想される数の 2 倍の SynchronizationContexts (本質的には別のスレッドである TPL オブジェクト) を初期化し、それらすべてを実行しているように見えますが、予想される数だけ待機しています。Parallel.ForEach().Wait() コマンドは実行中のスレッドの予想数で終了するため、Task もすべてが完了したと見なして終了します。その後、メイン プログラムは Task が終了したことを検出し、現在バックグラウンド スレッドが実行されていないことを確認すると、残りの Parallel.ForEach() がまだ終了していないためにエラーがスローされることがあります。

各 SynchronizationContext の post 呼び出し (Async メソッドのキッカー) でデバッグ ウィンドウに出力することによって、スレッドの量が私が述べたものと一致することが確認されました。各スレッドは、タスクの終了時に破棄される予定のメイン スレッド オブジェクトによっても参照されますが、実際には作成されるとは想定されていなかった未終了のスレッドが原因で参照がまだ残っているため、破棄は適切に行われません。

Thread testThread = Thread.CurrentThread;
Task backgroundTask = taskFactory.StartNew(() =>
{
    Thread rootTaskThread = Thread.CurrentThread;
    Assert.AreNotEqual(testThread, rootTaskThread, "First task should not inline");
    Thread.Sleep(TimeSpan.FromSeconds(2));

    Parallel.ForEach(new[] { 1, 2, 3, 4 },
       new ParallelOptions { TaskScheduler = taskFactory.Scheduler }, (int item) => {
        Thread.Sleep(TimeSpan.FromSeconds(1));
     });
});

上記の例では、メイン スレッド、backgroundTask タスク、および 8 つの Parallel.ForEach スレッドが存在することになり、そのうちの最後の 9 つは SynchronizationContexts で作成されます。

私のカスタムの SynchronizationContext でオーバーライドされる唯一のメソッドは post で、次のとおりです。

public override void Post(SendOrPostCallback d, object state){
    Request requestOrNull = Request.ExistsForCurrentThread() ? Request.GetForCurrentThread() as Request : null;
    Request.IAsyncContextData requestData = null;

    if (requestOrNull != null){
       requestData = requestOrNull.CaptureDataForNewThreadAndIncrementReferenceCount();
    }

    Debug.WriteLine("Task started - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));

    base.Post((object internalState) => {
        // Capture the spawned thread state and restore the originating thread state
        try{
            if (requestData != null){
                Request.AttachToAsynchronousContext(requestData);
            }
            d(state);
        }
        finally{
            // Restore original spawned thread state
            if (requestData != null){
            // Disposes the request if this is the last reference to it
                Request.DetachFromAsynchronousContext(requestData);
            }
        Debug.WriteLine("Task completed - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
        }
    }, state);
 }

私が信じている TaskScheduler は、必要な基本的なことだけを行っています。

private readonly RequestSynchronizationContext context;
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>();

public RequestTaskScheduler(RequestSynchronizationContext synchronizationContext)
{
    this.context = synchronizationContext;
}

protected override void QueueTask(Task task){
    this.tasks.Enqueue(task);
    this.context.Post((object state) => {
        Task nextTask;
        if (this.tasks.TryDequeue(out nextTask)) 
            this.TryExecuteTask(nextTask);
    }, null);
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){
    if (SynchronizationContext.Current == this.context)
        return this.TryExecuteTask(task);
    else
        return false;
}

protected override IEnumerable<Task> GetScheduledTasks(){
    return this.tasks.ToArray();
}

タスクファクトリー:

public RequestTaskFactory(RequestTaskScheduler taskScheduler)
    : base(taskScheduler)
{ }

なぜこれが起こっているのかについてのアイデアはありますか?

4

1 に答える 1