3

RxJava でモデル化された相互依存の非同期操作のグラフがあります。エラーによっては、グラフ全体を再実行する必要があります。retry(..) すべてのサブスクライバーにエラーが表示されるため、オペレーターはこれを直接サポートしていません。retry(..)オペレーターは再サブスクライブするだけなので、最終的なオブザーバブルから一度だけ計算されたエラーを常に取得します。つまり、再サブスクライブ時に作業が再度実行されることはありません。

すべてのサブスクリプションに対してオブザーバブル生成メソッドを呼び出す特別なオブザーバブルを作成しようとしました。その場合、再試行演算子はほとんど希望どおりに機能し、追加のキャッシュ操作の後、希望どおりに正確に機能します。

ただし、これは非常に一般的であるように思われるため、RxJava のどこかで既に提供されている作業を繰り返していると思われます。また、RxJava の十分な知識がなくても低レベルで何かをしようとしていることを考えると、ソリューションの堅牢性についても懸念しています。もう 1 つの問題は構成可能性です。3 つの形式すべてをサポートするretry(..)には、3 つのバージョンのラッパー メソッドが必要になります。

以下のデモンストレーションは、私がやろうとしていることとこれまでの成功を説明しています。

RxJavaでこの種の再試行を行うためのより単純またはより慣用的な(またはその両方の)方法はありますか?

package demo;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func0;
import rx.util.async.Async;

/**
 ** <p>
 * Demonstrate attempts to get RxJava retry for asynchronous work chain. The use
 * case that exposed this problem is reading and writing data with versioning
 * for optimistic concurrency. The work is a series of async I/O operations that
 * must be re-assembled from scratch if a stale version is detected on write.
 * </p>
 *
 * <p>
 * Four cases are demonstrated in this class:
 * </p>
 * <ul>
 * <li>Case 1: perform the work and naiively apply a retry operator to the
 * asynchronous work. This fails because the work itself is not retried on
 * re-subscribe.</li>
 * <li>Case 2: wrap the work in an observer that performs it on every
 * subscription. A retry operator applied to the wrapper correctly re-attempts
 * the work on failure. However, every subsequent subscriber to the result
 * causes the work to be performed again.</li>
 * <li>Case 3: Apply the cache operator to the result of the retry operator.
 * This performs as desired.</li>
 * <li>Case 4: Generalize the approach of case 3 and encapsulate it in an
 * observable generator method. This shows that it is difficult to generalize
 * this behavior because each retry operator form (number, predicate, perpetual)
 * will require its own generator method.</li>
 * </ul>
 *
 * <p>
 * NOTE: this code does not work if compiled by the Eclipse (Keppler) compiler
 * for Java 8. I have to compile with javac for it to work. There is some
 * problem with Lambda class naming in the code generated by Eclipse.
 * </p>
 *
 *
 */
public class AsyncRetryDemo {

    public static void main(final String[] args) throws Exception {

        new AsyncRetryDemo().case1();
        new AsyncRetryDemo().case2();
        new AsyncRetryDemo().case3();
        new AsyncRetryDemo().case4();

        // output is:
        //
        // case 1, sub 1: fail (max retries, called=1)
        // case 1, sub 2: fail (max retries, called=1)
        // case 2, sub 1: pass (called=2)
        // case 2, sub 2: fail (called=3)
        // case 3, sub 1: pass (called=2)
        // case 3, sub 2: pass (called=2)
        // case 4, sub 1: pass (called=2)
        // case 4, sub 2: pass (called=2)

    }

    private final AtomicInteger called = new AtomicInteger();

    private final CountDownLatch done = new CountDownLatch(2);

    /**
     * This represents a sequence of interdependent asynchronous operations that
     * might fail in a way that prescribes a retry (but in this case, all we are
     * doing is squaring an integer asynchronously and failing the first time).
     *
     * @param input
     *            to the process.
     *
     * @return promise to perform the work and produce either a result or a
     *         suggestion to retry (e.g. a stale version error).
     */
    private Observable<Integer> canBeRetried(final int a) {

        final Observable<Integer> rval;
        if (this.called.getAndIncrement() == 0) {
            rval = Observable.error(new RuntimeException(
                    "we always fail the first time"));
        } else {
            rval = Async.start(() -> a * a);
        }

        return rval;

    }

    private void case1() throws InterruptedException {

        /*
         * In this case, we invoke the observable-creator to get the async
         * promise. Of course, if it fails, any retry will fail as well because
         * the failed result is computed one time and pushed to all subscribers
         * forever.
         *
         * Thus this case fails because the first invocation of canBeRetried(..)
         * always fails.
         */
        final Observable<Integer> o = canBeRetried(2)

                .retry(2);

        check("case 1", o);

        this.done.await();

    }

    private void case2() throws InterruptedException {

        /*
         * In this case, we wrap canBeRetried(..) inside an observer that
         * invokes it on every subscription. So, we get past the retry problem.
         * But every new subscriber after the retry succeeds causes the work to
         * restart.
         */
        final Observable<Integer> o = Observable.create(
                new OnSubscribe<Integer>() {

                    @Override
                    public void call(final Subscriber<? super Integer> child) {
                        canBeRetried(2).subscribe(child);
                    }
                })

                .retry(2);

        check("case 2", o);

        this.done.await();

    }

    private void case3() throws InterruptedException {

        /*
         * In this case, we wrap canBeRetried(..) inside an observer that
         * invokes it on every subscription. So, we get past the retry problem.
         * We cache the result of the retry to solve the extra work problem.
         */
        final Observable<Integer> o = Observable.create(
                new OnSubscribe<Integer>() {

                    @Override
                    public void call(final Subscriber<? super Integer> child) {
                        canBeRetried(2).subscribe(child);
                    }
                })
                .retry(2)

                .cache();

        check("case 3", o);

        this.done.await();

    }

    private void case4() throws InterruptedException {

        /*
         * Same as case 3 but we use the retryAndCache(..) to do the work for
         * us.
         */
        final Observable<Integer> o = retryAndCache(() -> canBeRetried(2), 2);

        check("case 4", o);

        this.done.await();

    }

    private void check(final String label, final Observable<Integer> promise) {

        // does the work get retried on failure?
        promise.subscribe(
                v -> {
                    System.out.println(label + ", sub 1: "
                            + (this.called.get() == 2 ? "pass" : "fail")
                            + " (called=" + this.called.get() + ")");
                },
                x -> {
                    System.out.println(label
                            + ", sub 1: fail (max retries, called="
                            + this.called.get() + ")");
                    this.done.countDown();
                }, () -> {
                    this.done.countDown();
                });

        // do subsequent subscribers avoid invoking the work again?
        promise.subscribe(
                v -> {
                    System.out.println(label + ", sub 2: "
                            + (this.called.get() == 2 ? "pass" : "fail")
                            + " (called=" + this.called.get() + ")");
                },
                x -> {
                    System.out.println(label
                            + ", sub 2: fail (max retries, called="
                            + this.called.get() + ")");
                    this.done.countDown();
                }, () -> {
                    this.done.countDown();
                });

    }

    /**
     * Generalized retry and cache for case 4.
     *
     * @param binder
     *            user-provided supplier that assembles and starts the
     *            asynchronous work.
     *
     * @param retries
     *            number of times to retry on error.
     *
     * @return promise to perform the work and retry up to retry times on error.
     */
    private static <R> Observable<R> retryAndCache(
            final Func0<Observable<R>> binder, final int retries) {

        return Observable.create(new OnSubscribe<R>() {

            @Override
            public void call(final Subscriber<? super R> child) {
                binder.call().subscribe(child);
            }
        })

        .retry(retries)

        .cache();
    }

}
4

1 に答える 1

0

実際には、それをより良くするためのいくつかのオプションがあります。

最初のオプションは、 create の代わりにdeferを使用することです:

private void case5() throws InterruptedException {
    // Same as case 3 but using defer
    final Observable<Integer> o = Observable.defer(() -> canBeRetried(2)).retry(2).cache();

    check("case 5", o);

    this.done.await();
}

ただし、本当の問題は canBeRetired メソッドにあります。再試行のたびに呼び出す必要があります。より良いアプローチは、サブスクリプションごとにロジックを再実行する Observable を作成することです。メソッドは次のようになります。

 private Observable<Integer> canBeRetriedBetter(final int a) {
    return Observable.defer(() -> canBeRetried(a));
}

そしてコード:

private void case6() throws InterruptedException {

    final Observable<Integer> o = canBeRetriedBetter(2).retry(2).cache();

    check("case 6", o);

    this.done.await();
}

構成とカスタム変換を使用して、さらに改善することができます。それらを使用すると、一貫性のある再利用可能な方法で、一般的に使用される一連の演算子を任意のチェーンに適用できます。

たとえば、キャッシュを呼び出してストリームで再試行する演算子を定義できます。

   public static class RetryAndCache<T> implements Observable.Transformer<T, T>{
    private final int count;
    public RetryAndCache(int count) {
        this.count = count;
    }

    @Override
    public Observable<T> call(Observable<T> o) {
        return o.retry(count).cache();
    }
}

最後に、コード:

private void case7() throws InterruptedException {

    final Observable<Integer> o = canBeRetriedBetter(2).compose(new RetryAndCache(2));

    check("case 7", o);

    this.done.await();
}
于 2015-03-25T15:32:28.723 に答える