12

ワークフローに再試行ポリシーを導入する必要があります。このように接続された 3 つのブロックがあるとします。

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);

そのため、データを蓄積するバッファがあり、一度に 3 つ以下のアイテムを処理する変換ブロックに送信し、結果をアクション ブロックに送信します。

潜在的に変換ブロックの処理中に一時的なエラーが発生する可能性があります。エラーが一時的なものである場合は、ブロックを数回再試行します。

ブロックは一般的に再試行できないことを私は知っています (ブロックに渡されたデリゲートは再試行可能にすることができます)。オプションの 1 つは、渡されたデリゲートをラップして再試行をサポートすることです。

TransientFaultHandling.Coreまた、一時的な障害に対する再試行メカニズムを提供する非常に優れたライブラリがあることも知っています。これは優れたライブラリですが、私の場合はそうではありません。変換ブロックに渡されたデリゲートをRetryPolicy.ExecuteAsyncメソッドにラップすると、変換ブロック内のメッセージがロックされ、再試行が完了するか失敗するまで、変換ブロックは新しいメッセージを受信できなくなります。3 つのメッセージすべてが再試行に入力され (たとえば、次の再試行は 2 分後に行われるとします)、失敗した場合、少なくとも 1 つのメッセージが変換ブロックを離れるまで、変換ブロックは停止します。

私が見る唯一の解決策は、を拡張しTranformBlock(実際にITargetBlockは十分です)、手動で再試行することです(ここからのように):

do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );

ig を使用して、メッセージを変換ブロック内に遅延して再度配置しますが、この場合、再試行コンテキスト (残りの再試行回数など) もこのブロックに渡す必要があります。あまりにも複雑に聞こえます...

ワークフロー ブロックの再試行ポリシーを実装するためのより簡単なアプローチを見た人はいますか?

4

3 に答える 3

3

svick の優れた回答に加えて、他にもいくつかのオプションがあります。

  1. 使用できますTransientFaultHandling.Core-他のメッセージが通過できるように設定MaxDegreeOfParallelismするだけです。Unbounded
  2. LinkToブロックの出力タイプを変更して失敗の表示と再試行回数を含め、データフロー ループを作成して、別の再試行が必要かどうかを調べるフィルターを に渡すことができます。このアプローチはより複雑です。ブロックが再試行を行っている場合はブロックに遅延を追加しTransformBlock、残りのメッシュの失敗/再試行情報を削除するには a を追加する必要があります。
于 2013-07-05T02:58:17.397 に答える