36

TaskCompletionSource.SetResult();戻る前にタスクを待っているコードを呼び出すことを発見しました。私の場合、デッドロックが発生します。

普通に起動する簡易版ですThread

void ReceiverRun()
    while (true)
    {
        var msg = ReadNextMessage();
        TaskCompletionSource<Response> task = requests[msg.RequestID];

        if(msg.Error == null)
            task.SetResult(msg);
        else
            task.SetException(new Exception(msg.Error));
    }
}

コードの「非同期」部分は次のようになります。

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

Wait は、実際には非非同期呼び出し内にネストされています。

SendAwaitResponse (簡略化)

public static Task<Response> SendAwaitResponse(string msg)
{
    var t = new TaskCompletionSource<Response>();
    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.Task;
}

私の仮定では、2 番目の SendAwaitResponse は ThreadPool スレッドで実行されますが、ReceiverRun 用に作成されたスレッドで続行されます。

待っているコードを続行せずにタスクの結果を設定する方法はありますか?

アプリケーションはコンソール アプリケーションです

4

4 に答える 4

37

TaskCompletionSource.SetResult(); を発見しました。戻る前にタスクを待っているコードを呼び出します。私の場合、デッドロックが発生します。

はい、これを文書化したブログ投稿があります (私の知る限り、MSDN には文書化されていません)。デッドロックは、次の 2 つの理由で発生します。

  1. とのブロック コードが混在していasyncます (つまり、asyncメソッドが を呼び出していWaitます)。
  2. タスクの継続は、 を使用してスケジュールされTaskContinuationOptions.ExecuteSynchronouslyます。

最も単純な解決策から始めることをお勧めします。つまり、最初のもの (1) を削除します。asyncつまり、混合してWait呼び出しないでください:

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

代わりに、await一貫して使用してください。

await SendAwaitResponse("first message");
await SendAwaitResponse("second message");

必要に応じてWait、コール スタックのさらに上の別のポイント (メソッド内ではないasync) で行うことができます。

それが私の最も推奨される解決策です。ただし、2 番目のもの (2) を削除したい場合は、いくつかのトリックを行うことができます: a でラップしSetResultTask.Run別のスレッドに強制する (私のAsyncEx ライブラリには*WithBackgroundContinuations、まさにこれを行う拡張メソッドがあります)。スレッドに実際のコンテキスト ( my AsyncContexttypeなど) を指定ConfigureAwait(false)し、 を指定すると、継続でフラグが無視されExecuteSynchronouslyます。

asyncしかし、これらのソリューションは、コードを分離してブロックするだけではなく、はるかに複雑です。

補足として、TPL Dataflowを見てください。便利だと思うかもしれません。

于 2013-10-21T14:31:42.040 に答える
6

アプリはコンソール アプリであるため、既定の同期コンテキストで実行されます。この場合await、待機中のタスクが完了した同じスレッドで継続コールバックが呼び出されます。の後にスレッドを切り替えたい場合はawait SendAwaitResponse、次のようにしawait Task.Yield()ます。

await SendAwaitResponse("first message");
await Task.Yield(); 
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock

これをさらに改善するには、Thread.CurrentThread.ManagedThreadId内部に保存Task.Resultし、await. まだ同じスレッドにいる場合は、実行してくださいawait Task.Yield()

SendAwaitResponseこれは実際のコードの簡略化されたバージョンであることは理解していますが、内部では完全に同期しています(質問で示した方法)。そこにスレッドスイッチがあると期待するのはなぜですか?

とにかく、おそらく、現在どのスレッドにいるのかを推測しないように、ロジックを再設計する必要があります。混合awaitを避けTask.Wait()、すべてのコードを非同期にします。通常、Wait()トップ レベルのどこかに 1 つだけ (例: inside Main) を配置することができます。

[編集済み]task.SetResult(msg) from を呼び出すと、デフォルトの同期コンテキストの動作により、スレッド スイッチなしReceiverRunで実際に制御フローが転送されawaitます。したがって、実際のメッセージ処理を行うコードがスレッドtaskを引き継いでいます。ReceiverRun最終的SendAwaitResponse("second message").Wait()に、同じスレッドで呼び出され、デッドロックが発生します。

以下は、サンプルをモデルにしたコンソール アプリのコードです。await Task.Yield()内部を使用ProcessAsyncして別のスレッドで継続をスケジュールするため、制御フローが戻り、ReceiverRunデッドロックは発生しません。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        class Worker
        {
            public struct Response
            {
                public string message;
                public int threadId;
            }

            CancellationToken _token;
            readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
            readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();

            public Worker(CancellationToken token)
            {
                _token = token;
            }

            string ReadNextMessage()
            {
                // using Thread.Sleep(100) for test purposes here,
                // should be using ManualResetEvent (or similar synchronization primitive),
                // depending on how messages arrive
                string message;
                while (!_messages.TryDequeue(out message))
                {
                    Thread.Sleep(100);
                    _token.ThrowIfCancellationRequested();
                }
                return message;
            }

            public void ReceiverRun()
            {
                LogThread("Enter ReceiverRun");
                while (true)
                {
                    var msg = ReadNextMessage();
                    LogThread("ReadNextMessage: " + msg);
                    var tcs = _requests[msg];
                    tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
                    _token.ThrowIfCancellationRequested(); // this is how we terminate the loop
                }
            }

            Task<Response> SendAwaitResponse(string msg)
            {
                LogThread("SendAwaitResponse: " + msg);
                var tcs = new TaskCompletionSource<Response>();
                _requests.TryAdd(msg, tcs);
                _messages.Enqueue(msg);
                return tcs.Task;
            }

            public async Task ProcessAsync()
            {
                LogThread("Enter Worker.ProcessAsync");

                var task1 = SendAwaitResponse("first message");
                await task1;
                LogThread("result1: " + task1.Result.message);
                // avoid deadlock for task2.Wait() with Task.Yield()
                // comment this out and task2.Wait() will dead-lock
                if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task2 = SendAwaitResponse("second message");
                task2.Wait();
                LogThread("result2: " + task2.Result.message);

                var task3 = SendAwaitResponse("third message");
                // still on the same thread as with result 2, no deadlock for task3.Wait()
                task3.Wait();
                LogThread("result3: " + task3.Result.message);

                var task4 = SendAwaitResponse("fourth message");
                await task4;
                LogThread("result4: " + task4.Result.message);
                // avoid deadlock for task5.Wait() with Task.Yield()
                // comment this out and task5.Wait() will dead-lock
                if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task5 = SendAwaitResponse("fifth message");
                task5.Wait();
                LogThread("result5: " + task5.Result.message);

                LogThread("Leave Worker.ProcessAsync");
            }

            public static void LogThread(string message)
            {
                Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
            }
        }

        static void Main(string[] args)
        {
            Worker.LogThread("Enter Main");
            var cts = new CancellationTokenSource(5000); // cancel after 5s
            var worker = new Worker(cts.Token);
            Task receiver = Task.Run(() => worker.ReceiverRun());
            Task main = worker.ProcessAsync();
            try
            {
                Task.WaitAll(main, receiver);
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
            Worker.LogThread("Leave Main");
            Console.ReadLine();
        }
    }
}

Task.Run(() => task.SetResult(msg))これは、内部で行うことと大差ありませんReceiverRun。私が考える唯一の利点は、スレッドをいつ切り替えるかを明示的に制御できることです。このようにして、できるだけ長く同じスレッドにとどまることができます (たとえば、task2task3、しかし、 でのデッドロックを回避するために、task4後で別のスレッド切り替えが必要です)。task4task5.Wait()

どちらのソリューションも最終的にはスレッド プールが大きくなり、パフォーマンスとスケーラビリティの点で不利になります。

ここで、上記のコード内のどこでも置き換えるtask.Wait()await task、使用する必要がなくなり、デッドロックも発生しなくなります。ただし、最初の内部の後の呼び出しのチェーン全体は、実際にはスレッド上で実行されます。このスレッドを他のスタイルの呼び出しでブロックせず、メッセージを処理しているときに CPU バウンドの作業をあまり行わない限り、このアプローチは問題なく機能する可能性があります (非同期 IO にバインドされたスタイルの呼び出しは引き続きOK、実際には暗黙的なスレッド切り替えをトリガーする可能性があります)。ProcessAsyncawait Task.Yieldawaitawait task1ProcessAsyncReceiverRunWait()await

とは言っても、メッセージを処理するためにシリアル化同期コンテキストがインストールされた別のスレッドが必要になると思います( と同様WindowsFormsSynchronizationContext)。それは、非同期コードを含む場所awaitsで実行する必要があります。Task.Waitそのスレッドでの使用を避ける必要があります。また、個々のメッセージ処理に CPU バウンドの作業が多くかかる場合はTask.Run、そのような作業に を使用する必要があります。非同期 IO バウンド呼び出しの場合、同じスレッドにとどまることができます。

非同期メッセージ処理ロジックについては、@StephenClearyのNito ActionDispatcherAsynchronous Libraryを参照してください。うまくいけば、スティーブンが飛び込んで、より良い答えを提供します.ActionDispatcherSynchronizationContext

于 2013-10-21T07:43:12.490 に答える
0

「私の仮定では、2 番目の SendAwaitResponse は ThreadPool スレッドで実行されますが、ReceiverRun 用に作成されたスレッドで続行されます。」

SendAwaitResponse 内で何をするかに完全に依存します。非同期性と並行性は同じものではありません

チェックアウト: C# 5 Async/Await - それは*同時*ですか?

于 2013-10-20T19:56:48.417 に答える