ワークフローに再試行ポリシーを導入する必要があります。このように接続された 3 つのブロックがあるとします。
var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);
buffer.LinkTo(processing);
processing.LinkTo(send);
そのため、データを蓄積するバッファがあり、一度に 3 つ以下のアイテムを処理する変換ブロックに送信し、結果をアクション ブロックに送信します。
潜在的に変換ブロックの処理中に一時的なエラーが発生する可能性があります。エラーが一時的なものである場合は、ブロックを数回再試行します。
ブロックは一般的に再試行できないことを私は知っています (ブロックに渡されたデリゲートは再試行可能にすることができます)。オプションの 1 つは、渡されたデリゲートをラップして再試行をサポートすることです。
TransientFaultHandling.Core
また、一時的な障害に対する再試行メカニズムを提供する非常に優れたライブラリがあることも知っています。これは優れたライブラリですが、私の場合はそうではありません。変換ブロックに渡されたデリゲートをRetryPolicy.ExecuteAsync
メソッドにラップすると、変換ブロック内のメッセージがロックされ、再試行が完了するか失敗するまで、変換ブロックは新しいメッセージを受信できなくなります。3 つのメッセージすべてが再試行に入力され (たとえば、次の再試行は 2 分後に行われるとします)、失敗した場合、少なくとも 1 つのメッセージが変換ブロックを離れるまで、変換ブロックは停止します。
私が見る唯一の解決策は、を拡張しTranformBlock
(実際にITargetBlock
は十分です)、手動で再試行することです(ここからのように):
do
{
try { return await transform(input); }
catch
{
if( numRetries <= 0 ) throw;
else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
}
} while( numRetries-- > 0 );
ig を使用して、メッセージを変換ブロック内に遅延して再度配置しますが、この場合、再試行コンテキスト (残りの再試行回数など) もこのブロックに渡す必要があります。あまりにも複雑に聞こえます...
ワークフロー ブロックの再試行ポリシーを実装するためのより簡単なアプローチを見た人はいますか?