私はいくつかの async await に少し苦労しています。一部のプログラム間でメッセージを送受信するために RabbitMQ を使用しています。
少し背景として、RabbitMQ クライアントは 3 つほどのスレッドを使用していることがわかります。1 つの接続スレッドと 2 つのハートビート スレッドです。メッセージが TCP 経由で受信されるたびに、接続スレッドがそれを処理し、インターフェイス経由で提供したコールバックを呼び出します。ドキュメントによると、この呼び出しは接続と同じスレッドで行われ、処理を続行する必要があるため、この呼び出し中に多くの作業を行うことは避けるのが最善です。これらはQueueingBasicConsumer
、メッセージの受信を待機するために使用されるブロッキング 'Dequeue' メソッドを持つ を提供します。
この待機時間中に消費者が実際にスレッド コンテキストを解放できるようにして、他の誰かが何らかの作業を行えるようにしたかったので、async/await タスクを使用することにしました。次の方法で sを使用するAwaitableBasicConsumer
クラスを作成しました。TaskCompletionSource
待機可能な Dequeue メソッドがあります。
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
//we are enqueueing a TCS. This is a "read"
rwLock.EnterReadLock();
try
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
//if we are cancelled before we finish, this will cause the tcs to become cancelled
cancellationToken.Register(() =>
{
tcs.TrySetCanceled();
});
//if there is something in the undelivered queue, the task will be immediately completed
//otherwise, we queue the task into deliveryTCS
if (!TryDeliverUndelivered(tcs))
deliveryTCS.Enqueue(tcs);
}
return tcs.Task;
}
finally
{
rwLock.ExitReadLock();
}
}
rabbitmq クライアントが呼び出すコールバックは、タスクを実行します。これは、AMQP 接続スレッドのコンテキストから呼び出されます。
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
//we want nothing added while we remove. We also block until everybody is done.
rwLock.EnterWriteLock();
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
bool sent = false;
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
//once we manage to actually set somebody's result, we are done with handling this
if (tcs.TrySetResult(e))
{
sent = true;
break;
}
}
//if nothing was sent, we queue up what we got so that somebody can get it later.
/**
* Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
* doing our thing here.
*/
if (!sent)
{
undelivered.Enqueue(e);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
rwLock
ですReaderWriterLockSlim
。2 つのキュー (deliveryTCS
およびundelivered
) は ConcurrentQueues です。
問題:
時々、dequeue メソッドを待機しているメソッドが例外をスローします。そのメソッドも同様async
であり、タスクが入る「例外」完了状態になるため、これは通常は問題になりません。DequeueAsync
この問題は、RabbitMQ クライアントが作成する AMQP 接続スレッドでの待機後に、呼び出したタスクが再開される状況で発生します。通常、タスクがメイン スレッドまたはワーカー スレッドのいずれかで再開されるのを見てきました。ただし、AMQP スレッドで再開し、例外がスローされると、すべてが停止します。タスクは「例外状態」に移行せず、AMQP 接続スレッドは、例外が発生したメソッドを実行していると伝えたままになります。
ここでの私の主な混乱は、これが機能しない理由です。
var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards
ConsumerTaskState state = new ConsumerTaskState()
{
Connection = connection,
CancellationToken = cancellationToken
};
//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);
RunAsync
テスト用に設定されたメソッドは次のとおりです。
public async Task RunAsync()
{
using (var channel = this.Connection.CreateModel())
{
...
AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
var result = consumer.DequeueAsync(this.CancellationToken);
//wait until we find something to eat
await result;
throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
...
} //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}
私が書いたものを読むと、私は自分の問題をうまく説明できていないかもしれないことがわかります。説明が必要な場合は、質問してください。
私が思いついた解決策は次のとおりです。
- すべての Async/Await コードを削除し、まっすぐなスレッドとブロックのみを使用してください。パフォーマンスは低下しますが、少なくとも失速することはありません
- どういうわけか、AMQP スレッドがタスクの再開に使用されないようにします。私は彼らが眠っているか何かだったと思いますが、デフォルト
TaskScheduler
ではそれらを使用することにしました。これらのスレッドが立ち入り禁止であることをタスク スケジューラに伝える方法を見つけることができれば、それは素晴らしいことです。
なぜこれが起こっているのか、またはこれを解決するための提案について誰かが説明していますか? 現在、プログラムの信頼性を高めるために非同期コードを削除していますが、ここで何が起こっているのかを本当に理解したいと思っています。