9

私はいくつかの async await に少し苦労しています。一部のプログラム間でメッセージを送受信するために RabbitMQ を使用しています。

少し背景として、RabbitMQ クライアントは 3 つほどのスレッドを使用していることがわかります。1 つの接続スレッドと 2 つのハートビート スレッドです。メッセージが TCP 経由で受信されるたびに、接続スレッドがそれを処理し、インターフェイス経由で提供したコールバックを呼び出します。ドキュメントによると、この呼び出しは接続と同じスレッドで行われ、処理を続行する必要があるため、この呼び出し中に多くの作業を行うことは避けるのが最善です。これらはQueueingBasicConsumer、メッセージの受信を待機するために使用されるブロッキング 'Dequeue' メソッドを持つ を提供します。

この待機時間中に消費者が実際にスレッド コンテキストを解放できるようにして、他の誰かが何らかの作業を行えるようにしたかったので、async/await タスクを使用することにしました。次の方法で sを使用するAwaitableBasicConsumerクラスを作成しました。TaskCompletionSource

待機可能な Dequeue メソッドがあります。

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    //we are enqueueing a TCS. This is a "read"
    rwLock.EnterReadLock();

    try
    {
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

        //if we are cancelled before we finish, this will cause the tcs to become cancelled
        cancellationToken.Register(() =>
        {
            tcs.TrySetCanceled();
        });

        //if there is something in the undelivered queue, the task will be immediately completed
        //otherwise, we queue the task into deliveryTCS
        if (!TryDeliverUndelivered(tcs))
            deliveryTCS.Enqueue(tcs);
        }

        return tcs.Task;
    }
    finally
    {
        rwLock.ExitReadLock();
    }
}

rabbitmq クライアントが呼び出すコールバックは、タスクを実行します。これは、AMQP 接続スレッドのコンテキストから呼び出されます。

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    //we want nothing added while we remove. We also block until everybody is done.
    rwLock.EnterWriteLock();
    try
    {
        RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);

        bool sent = false;
        TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
        while (deliveryTCS.TryDequeue(out tcs))
        {
            //once we manage to actually set somebody's result, we are done with handling this
            if (tcs.TrySetResult(e))
            {
                sent = true;
                break;
            }
        }

        //if nothing was sent, we queue up what we got so that somebody can get it later.
        /**
         * Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
         * next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
         * doing our thing here.
         */
        if (!sent)
        {
            undelivered.Enqueue(e);
        }
    }
    finally
    {
        rwLock.ExitWriteLock();
    }
}

rwLockですReaderWriterLockSlim。2 つのキュー (deliveryTCSおよびundelivered) は ConcurrentQueues です。

問題:

時々、dequeue メソッドを待機しているメソッドが例外をスローします。そのメソッドも同様asyncであり、タスクが入る「例外」完了状態になるため、これは通常は問題になりません。DequeueAsyncこの問題は、RabbitMQ クライアントが作成する AMQP 接続スレッドでの待機後に、呼び出したタスクが再開される状況で発生します。通常、タスクがメイン スレッドまたはワーカー スレッドのいずれかで再開されるのを見てきました。ただし、AMQP スレッドで再開し、例外がスローされると、すべてが停止します。タスクは「例外状態」に移行せず、AMQP 接続スレッドは、例外が発生したメソッドを実行していると伝えたままになります。

ここでの私の主な混乱は、これが機能しない理由です。

var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards

ConsumerTaskState state = new ConsumerTaskState()
{
    Connection = connection,
    CancellationToken = cancellationToken
};

//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);

RunAsyncテスト用に設定されたメソッドは次のとおりです。

public async Task RunAsync()
{
    using (var channel = this.Connection.CreateModel())
    {
        ...
        AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
        var result = consumer.DequeueAsync(this.CancellationToken);

        //wait until we find something to eat
        await result;

        throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
        ...
    } //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}

私が書いたものを読むと、私は自分の問題をうまく説明できていないかもしれないことがわかります。説明が必要な場合は、質問してください。

私が思いついた解決策は次のとおりです。

  • すべての Async/Await コードを削除し、まっすぐなスレッドとブロックのみを使用してください。パフォーマンスは低下しますが、少なくとも失速することはありません
  • どういうわけか、AMQP スレッドがタスクの再開に使用されないようにします。私は彼らが眠っているか何かだったと思いますが、デフォルトTaskSchedulerではそれらを使用することにしました。これらのスレッドが立ち入り禁止であることをタスク スケジューラに伝える方法を見つけることができれば、それは素晴らしいことです。

なぜこれが起こっているのか、またはこれを解決するための提案について誰かが説明していますか? 現在、プログラムの信頼性を高めるために非同期コードを削除していますが、ここで何が起こっているのかを本当に理解したいと思っています。

4

1 に答える 1

5

最初に、 がコンテキストを取得し、それを使用して実行を再開する方法を正確な用語で説明している私のasync紹介を読むことをお勧めします。await要するに、それは現在SynchronizationContext(または現在のTaskScheduler場合SynchronizationContext.Currentnull)をキャプチャします。

その他の重要な詳細は、async継続がスケジュールTaskContinuationOptions.ExecuteSynchronouslyされていることです (@svick がコメントで指摘したように)。これについてのブログ投稿がありますが、公式にはどこにも文書化されていません。この詳細により、プロデューサー/コンシューマー キューの作成async困難になります。

「元のコンテキストに戻らない」理由awaitは、(おそらく) RabbitMQ スレッドにSynchronizationContextorがないためです。したがって、これらのスレッドは通常のスレッド プール スレッドのように見えるため、TaskScheduler呼び出し時に継続が直接実行されます。TrySetResult

ところで、あなたのコードを読んで、リーダー/ライター ロックと同時キューの使用が間違っていると思われます。コード全体を見ないとわかりませんが、それが私の印象です。

既存のキューを使用し、それを中心にコンシューマーを構築することを強くお勧めしますasync(つまり、難しい部分は他の人に任せてください :)。TPL データフローBufferBlock<T>型はキューとして機能できます。プラットフォームで Dataflow を利用できる場合、これが私の最初の推奨事項です。それ以外の場合は、私の AsyncEx ライブラリに型があります。または、独自の型を作成することもできます(ブログで説明しています)。asyncAsyncProducerConsumerQueue

を使用した例を次に示しBufferBlock<T>ます。

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
    _queue.Post(e);
}

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    return _queue.ReceiveAsync(cancellationToken);
}

この例では、DequeueAsyncAPI を保持しています。ただし、TPL Dataflow の使用を開始したら、他の場所でも使用することを検討してください。このようなキューが必要な場合、データフロー アプローチの恩恵を受けるコードの他の部分を見つけるのが一般的です。たとえば、 を呼び出す一連のメソッドを用意する代わりに、 を にリンクDequeueAsyncできます。BufferBlockActionBlock

于 2013-10-31T00:28:59.097 に答える