http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9に相互投稿
私は知っています...私はTplDataflowを最大限に活用していません。ATMBufferBlock
プロデューサーとコンシューマーが異なる速度で実行されているメッセージ パッシング用の安全なキューとして使用しているだけです。どうすればよいのか途方に暮れる奇妙な動作が見られます。
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
上記のコード (2000 行の分散ソリューションの一部) では、約Send
100 ミリ秒ごとに定期的に呼び出されています。これは、アイテムが1 秒間に約 10 回Post
編集されることを意味します。messageQueue
これは確認済みです。ただし、ReceiveAsync
タイムアウト内に が完了せず (つまり が完了しない)、30 秒後にPost
発生する場合があります。この時点で数百に上ります。これは予想外です。この問題は、投稿速度が遅い場合 (1 投稿/秒) にも見られ、通常、1000 個のアイテムが を通過する前に発生します。ReceiveAsync
TimeoutException
messageQueue.Count
BufferBlock
したがって、この問題を回避するために、次のコードを使用しています。これは機能しますが、受信時に 1 秒の遅延が発生することがあります (上記のバグが発生するため)。
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
BufferBlock
これは私には TDF の競合状態のように見えますが、同様の方法で使用する他の場所でこれが発生しない理由を突き止めることはできません。実験的に からReceiveAsync
に変更しReceive
ても役に立ちません。私は確認していませんが、上記のコードは完全に機能すると思います。これは、「TPL データフローの概要」tpldataflow.docxに記載されているパターンです。
この問題の根底にたどり着くにはどうすればよいですか?何が起こっているかを推測するのに役立つメトリクスはありますか? 信頼できるテスト ケースを作成できない場合、他にどのような情報を提供できますか?
ヘルプ!