17

http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9に相互投稿

私は知っています...私はTplDataflowを最大限に活用していません。ATMBufferBlockプロデューサーとコンシューマーが異なる速度で実行されているメッセージ パッシング用の安全なキューとして使用しているだけです。どうすればよいのか途方に暮れる奇妙な動作が見られます。

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}

上記のコード (2000 行の分散ソリューションの一部) では、約Send100 ミリ秒ごとに定期的に呼び出されています。これは、アイテムが1 秒間に約 10 回Post編集されることを意味します。messageQueueこれは確認済みです。ただし、ReceiveAsyncタイムアウト内に が完了せず (つまり が完了しない)、30 秒後にPost発生する場合があります。この時点で数百に上ります。これは予想外です。この問題は、投稿速度が遅い場合 (1 投稿/秒) にも見られ、通常、1000 個のアイテムが を通過する前に発生します。ReceiveAsyncTimeoutExceptionmessageQueue.CountBufferBlock

したがって、この問題を回避するために、次のコードを使用しています。これは機能しますが、受信時に 1 秒の遅延が発生することがあります (上記のバグが発生するため)。

    public async Task<object> GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }

BufferBlockこれは私には TDF の競合状態のように見えますが、同様の方法で使用する他の場所でこれが発生しない理由を突き止めることはできません。実験的に からReceiveAsyncに変更しReceiveても役に立ちません。私は確認していませんが、上記のコードは完全に機能すると思います。これは、「TPL データフローの概要」tpldataflow.docxに記載されているパターンです。

この問題の根底にたどり着くにはどうすればよいですか?何が起こっているかを推測するのに役立つメトリクスはありますか? 信頼できるテスト ケースを作成できない場合、他にどのような情報を提供できますか?

ヘルプ!

4

1 に答える 1