25

IntroToRxで、著者は、一定期間後にネットワーク要求のような I/O 要求を再試行する I/O の「スマートな」再試行を作成することを提案しています。

正確な段落は次のとおりです。

独自のライブラリに追加する便利な拡張メソッドは、"Back Off and Retry" メソッドです。私が一緒に働いたチームは、I/O、特にネットワーク リクエストを実行するときに、このような機能が役立つことを発見しました。概念は、試行し、失敗した場合は一定時間待ってから再試行することです。このメソッドのバージョンでは、再試行する例外の種類と再試行の最大回数が考慮される場合があります。後続の各再試行で攻撃的でないように、待機期間を長くすることもできます。

残念ながら、このメソッドの書き方がわかりません。:(

4

6 に答える 6

40

このバックオフ再試行の実装の鍵はdeferred observablesです。遅延オブザーバブルは、誰かがサブスクライブするまでファクトリを実行しません。また、サブスクリプションごとにファクトリが呼び出されるため、再試行のシナリオに最適です。

ネットワーク要求をトリガーするメソッドがあるとします。

public IObservable<WebResponse> SomeApiMethod() { ... }

この小さなスニペットの目的のために、遅延オブジェクトを次のように定義しましょう。source

var source = Observable.Defer(() => SomeApiMethod());

誰かがソースをサブスクライブするたびに、SomeApiMethod が呼び出され、新しい Web リクエストが開始されます。失敗したときに再試行する単純な方法は、組み込みの Retry 演算子を使用することです。

source.Retry(4)

しかし、それは API にとってあまり良いことではなく、あなたが求めているものでもありません。各試行の間にリクエストの開始を遅らせる必要があります。これを行う 1 つの方法は、遅延サブスクリプションを使用することです。

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

最初のリクエストでも遅延が追加されるため、これは理想的ではありません。修正しましょう。

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

ただし、1 秒間一時停止するだけでは、再試行の方法としてはあまり適切ではありません。この定数を、再試行カウントを受け取って適切な遅延を返す関数に変更しましょう。指数バックオフは実装が簡単です。

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))

再試行する例外を指定する方法を追加する必要があるだけです。再試行する意味があるかどうかにかかわらず、例外が返される関数を追加しましょう。これを retryOnError と呼びます。

ここで、恐ろしく見えるコードを書く必要がありますが、我慢してください。

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))

これらの山括弧はすべて、 を超えて再試行してはならない例外をマーシャリングするためにあり.Retry()ます。内側のオブザーバブルをIObservable<Tuple<bool, WebResponse, Exception>>、最初の bool が応答または例外があるかどうかを示す場所にしました。retryOnError が特定の例外を再試行する必要があることを示している場合、内部のオブザーバブルがスローされ、再試行によって取得されます。SelectMany はタプルをアンラップし、結果のオブザーバブルをIObservable<WebRequest>再度作成します。

最終バージョンの完全なソースとテストを含む私の要点を参照してください。この演算子を使用すると、再試行コードを非常に簡潔に記述できます

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )
于 2013-09-25T09:04:06.453 に答える
13

状況を単純化しすぎているのかもしれませんが、Retry の実装を見ると、それは単に、オブザーバブルの無限の列挙可能な Observable.Catch です。

private static IEnumerable<T> RepeatInfinite<T>(T value)
{
    while (true)
        yield return value;
}

public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
{
    return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source));
}

したがって、このアプローチを採用する場合、最初の歩留まりの後に遅延を追加するだけで済みます。

private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime)
{
    // Don't delay the first time        
    yield return source;

    while (true)
        yield return source.DelaySubscription(dueTime);
    }

public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
    return RepeateInfinite(source, dueTime).Catch();
}

再試行回数を指定して特定の例外をキャッチするオーバーロードは、さらに簡潔にすることができます。

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception
{
    return source.Catch<TSource, TException>(exception =>
    {
        if (count <= 0)
        {
            return Observable.Throw<TSource>(exception);
        }

        return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count);
    });
}

ここでのオーバーロードは再帰を使用していることに注意してください。初見では、count が Int32.MaxValue のようなものである場合、StackOverflowException が発生する可能性があるように見えます。ただし、DelaySubscription はスケジューラを使用してサブスクリプション アクションを実行するため、スタック オーバーフローは発生しません (つまり、「トランポリン」を使用します)。ただし、これはコードを見ても明らかではないと思います。DelaySubscription オーバーロードのスケジューラを Scheduler.Immediate に明示的に設定し、TimeSpan.Zero と Int32.MaxValue を渡すことで、強制的にスタック オーバーフローを発生させることができます。非即時スケジューラを渡して、意図をもう少し明示的に表現することができます。次に例を示します。

return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count);

更新: 特定のスケジューラを取り込むためにオーバーロードを追加しました。

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
    this IObservable<TSource> source,
    TimeSpan retryDelay,
    int retryCount,
    IScheduler scheduler) where TException : Exception
{
    return source.Catch<TSource, TException>(
        ex =>
        {
            if (retryCount <= 0)
            {
                return Observable.Throw<TSource>(ex);
            }

            return
                source.DelaySubscription(retryDelay, scheduler)
                    .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler);
        });
} 
于 2013-11-22T12:46:58.163 に答える
2

これが私が使用しているものです:

public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay)
{
    Contract.Requires(src != null);
    Contract.Ensures(Contract.Result<IObservable<T>>() != null);

    if (delay == TimeSpan.Zero) return src.Retry();
    return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry());
}
于 2013-11-28T17:35:19.990 に答える
2

Markus の回答に基づいて、次のように書きました。

public static class ObservableExtensions
{
    private static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError,
        int attempt)
    {
        return Observable
            .Defer(() =>
            {
                var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt);
                var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay);

                return s
                    .Catch<T, Exception>(e =>
                    {
                        if (retryOnError(attempt, e))
                        {
                            return source.BackOffAndRetry(strategy, retryOnError, attempt + 1);
                        }
                        return Observable.Throw<T>(e);
                    });
            });
    }

    public static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError)
    {
        return source.BackOffAndRetry(strategy, retryOnError, 0);
    }
}

もっと好きだから

  • 変更はしませんattemptsが、再帰を使用します。
  • 使用しませんretriesが、試行回数をに渡しますretryOnError
于 2017-01-21T22:41:27.347 に答える
1

Rxxがどのようにそれを行うかを研究しているときに私が思いついた別のわずかに異なる実装を次に示します。したがって、これは主に Rxx のアプローチの縮小版です。

署名は、Markus のバージョンとは少し異なります。再試行する例外のタイプを指定すると、遅延戦略が例外と再試行回数を取得するため、連続する再試行ごとに遅延が長くなる可能性があります。

それがバグプルーフであること、または最良のアプローチであることを保証することはできませんが、うまくいくようです。

public static IObservable<TSource> RetryWithDelay<TSource, TException>(this IObservable<TSource> source, Func<TException, int, TimeSpan> delayFactory, IScheduler scheduler = null)
where TException : Exception
{
    return Observable.Create<TSource>(observer =>
    {
        scheduler = scheduler ?? Scheduler.CurrentThread;
        var disposable = new SerialDisposable();
        int retryCount = 0;

        var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero,
        self =>
        {
            var subscription = source.Subscribe(
            observer.OnNext,
            ex =>
            {
                var typedException = ex as TException;
                if (typedException != null)
                {
                    var retryDelay = delayFactory(typedException, ++retryCount);
                    self(retryDelay);
                }
                else
                {
                    observer.OnError(ex);
                }
            },
            observer.OnCompleted);

            disposable.Disposable = subscription;
        });

        return new CompositeDisposable(scheduleDisposable, disposable);
    });
}
于 2013-09-26T04:07:16.570 に答える