Teaser : 皆さん、この質問は再試行ポリシーを実装する方法に関するものではありません。これは、TPL Dataflow ブロックの正しい完了に関するものです。
この質問は、以前の質問ITargetBlock 内の再試行ポリシーの続きです。TransformBlock
この質問に対する答えは、 (ソース) とTransformManyBlock
(ターゲット)を利用する @svick のスマートなソリューションでした。残っている唯一の問題は、このブロックを正しい方法で完了することです。最初にすべての再試行が完了するのを待ってから、ターゲット ブロックを完了します。これが私が最終的に得たものです(これは単なるスニペットです。スレッドセーフではないretries
セットにはあまり注意を払わないでください):
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
while (target.InputCount > 0 || retries.Any())
await Task.Delay(100);
target.Complete();
});
アイデアは、何らかのポーリングを実行し、処理を待機しているメッセージがまだあるかどうか、および再試行が必要なメッセージがないかどうかを確認することです。しかし、このソリューションでは、ポーリングのアイデアは好きではありません。
はい、再試行の追加/削除のロジックを別のクラスにカプセル化できます。たとえば、再試行のセットが空になったときに何らかのアクションを実行することもできますが、target.InputCount > 0
条件を処理するにはどうすればよいですか? ブロックに保留中のメッセージがない場合に呼び出されるようなコールバックは存在しないため、target.ItemCount
遅延の少ないループで検証するしかないようです。
これを達成するためのよりスマートな方法を知っている人はいますか?