アプリはコンソール アプリであるため、既定の同期コンテキストで実行されます。この場合await
、待機中のタスクが完了した同じスレッドで継続コールバックが呼び出されます。の後にスレッドを切り替えたい場合はawait SendAwaitResponse
、次のようにしawait Task.Yield()
ます。
await SendAwaitResponse("first message");
await Task.Yield();
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
これをさらに改善するには、Thread.CurrentThread.ManagedThreadId
内部に保存Task.Result
し、await
. まだ同じスレッドにいる場合は、実行してくださいawait Task.Yield()
。
SendAwaitResponse
これは実際のコードの簡略化されたバージョンであることは理解していますが、内部では完全に同期しています(質問で示した方法)。そこにスレッドスイッチがあると期待するのはなぜですか?
とにかく、おそらく、現在どのスレッドにいるのかを推測しないように、ロジックを再設計する必要があります。混合await
を避けTask.Wait()
、すべてのコードを非同期にします。通常、Wait()
トップ レベルのどこかに 1 つだけ (例: inside Main
) を配置することができます。
[編集済み]task.SetResult(msg)
from を呼び出すと、デフォルトの同期コンテキストの動作により、スレッド スイッチなしReceiverRun
で実際に制御フローが転送されawait
ます。したがって、実際のメッセージ処理を行うコードがスレッドtask
を引き継いでいます。ReceiverRun
最終的SendAwaitResponse("second message").Wait()
に、同じスレッドで呼び出され、デッドロックが発生します。
以下は、サンプルをモデルにしたコンソール アプリのコードです。await Task.Yield()
内部を使用ProcessAsync
して別のスレッドで継続をスケジュールするため、制御フローが戻り、ReceiverRun
デッドロックは発生しません。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
class Worker
{
public struct Response
{
public string message;
public int threadId;
}
CancellationToken _token;
readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
public Worker(CancellationToken token)
{
_token = token;
}
string ReadNextMessage()
{
// using Thread.Sleep(100) for test purposes here,
// should be using ManualResetEvent (or similar synchronization primitive),
// depending on how messages arrive
string message;
while (!_messages.TryDequeue(out message))
{
Thread.Sleep(100);
_token.ThrowIfCancellationRequested();
}
return message;
}
public void ReceiverRun()
{
LogThread("Enter ReceiverRun");
while (true)
{
var msg = ReadNextMessage();
LogThread("ReadNextMessage: " + msg);
var tcs = _requests[msg];
tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
_token.ThrowIfCancellationRequested(); // this is how we terminate the loop
}
}
Task<Response> SendAwaitResponse(string msg)
{
LogThread("SendAwaitResponse: " + msg);
var tcs = new TaskCompletionSource<Response>();
_requests.TryAdd(msg, tcs);
_messages.Enqueue(msg);
return tcs.Task;
}
public async Task ProcessAsync()
{
LogThread("Enter Worker.ProcessAsync");
var task1 = SendAwaitResponse("first message");
await task1;
LogThread("result1: " + task1.Result.message);
// avoid deadlock for task2.Wait() with Task.Yield()
// comment this out and task2.Wait() will dead-lock
if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task2 = SendAwaitResponse("second message");
task2.Wait();
LogThread("result2: " + task2.Result.message);
var task3 = SendAwaitResponse("third message");
// still on the same thread as with result 2, no deadlock for task3.Wait()
task3.Wait();
LogThread("result3: " + task3.Result.message);
var task4 = SendAwaitResponse("fourth message");
await task4;
LogThread("result4: " + task4.Result.message);
// avoid deadlock for task5.Wait() with Task.Yield()
// comment this out and task5.Wait() will dead-lock
if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task5 = SendAwaitResponse("fifth message");
task5.Wait();
LogThread("result5: " + task5.Result.message);
LogThread("Leave Worker.ProcessAsync");
}
public static void LogThread(string message)
{
Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
}
}
static void Main(string[] args)
{
Worker.LogThread("Enter Main");
var cts = new CancellationTokenSource(5000); // cancel after 5s
var worker = new Worker(cts.Token);
Task receiver = Task.Run(() => worker.ReceiverRun());
Task main = worker.ProcessAsync();
try
{
Task.WaitAll(main, receiver);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
Worker.LogThread("Leave Main");
Console.ReadLine();
}
}
}
Task.Run(() => task.SetResult(msg))
これは、内部で行うことと大差ありませんReceiverRun
。私が考える唯一の利点は、スレッドをいつ切り替えるかを明示的に制御できることです。このようにして、できるだけ長く同じスレッドにとどまることができます (たとえば、task2
、task3
、しかし、 でのデッドロックを回避するために、task4
後で別のスレッド切り替えが必要です)。task4
task5.Wait()
どちらのソリューションも最終的にはスレッド プールが大きくなり、パフォーマンスとスケーラビリティの点で不利になります。
ここで、上記のコード内のどこでも置き換えるtask.Wait()
とawait task
、使用する必要がなくなり、デッドロックも発生しなくなります。ただし、最初の内部の後の呼び出しのチェーン全体は、実際にはスレッド上で実行されます。このスレッドを他のスタイルの呼び出しでブロックせず、メッセージを処理しているときに CPU バウンドの作業をあまり行わない限り、このアプローチは問題なく機能する可能性があります (非同期 IO にバインドされたスタイルの呼び出しは引き続きOK、実際には暗黙的なスレッド切り替えをトリガーする可能性があります)。ProcessAsync
await Task.Yield
await
await task1
ProcessAsync
ReceiverRun
Wait()
await
とは言っても、メッセージを処理するためにシリアル化同期コンテキストがインストールされた別のスレッドが必要になると思います( と同様WindowsFormsSynchronizationContext
)。それは、非同期コードを含む場所awaits
で実行する必要があります。Task.Wait
そのスレッドでの使用を避ける必要があります。また、個々のメッセージ処理に CPU バウンドの作業が多くかかる場合はTask.Run
、そのような作業に を使用する必要があります。非同期 IO バウンド呼び出しの場合、同じスレッドにとどまることができます。
非同期メッセージ処理ロジックについては、@StephenClearyのNito
ActionDispatcher
Asynchronous Libraryを参照してください。うまくいけば、スティーブンが飛び込んで、より良い答えを提供します.ActionDispatcherSynchronizationContext