27

私はRxJavaを初めて使用しますが、RxJavaの学習を支援するために取り組んでいるプロジェクトに統合しています。ベスト プラクティスに関する質問がありました。

onError処理が停止しない場合の対処法について質問がありObservableます。

セットアップは次のとおりです。

2 つ以上のネットワーク リクエストを実行したいユーザー ID のリストがあります。ユーザー ID に対するネットワーク要求のいずれかが失敗した場合、そのユーザー ID は更新されず、スキップできます。これにより、他のユーザー ID の処理が妨げられることはありません。解決策はありますが、サブスクライブがネストされています (2 番目のコード ブロックを参照)。私が見ている 1 つの問題は、各呼び出しが失敗した場合、特定のしきい値数が失敗したことを検出した後でも、残りの呼び出しがネットワーク リソースにヒットするのを短絡して停止する方法がないことです。

これを行うより良い方法はありますか?

従来のコードでは:

List<String> results = new ArrayList<String>();
for (String userId : userIds) {
    try {
        String info = getInfo(userId);  // can throw an GetInfoException
        String otherInfo = getOtherInfo(userId);  // can throw an GetOtherInfoException
        results.add(info + ", " + otherInfo);
    } catch (GetInfoException e) {
        log.error(e);
    } catch (GetOtherInfoException e) {
        log.error(e);
    }
}

問題:

擬似コード:

userid -> network requests -> result 
1 -> a, b -> onNext(1[a ,b])
2 -> a, onError -> onError
3 -> a, b -> onNext(3[a, b])
4 -> a, b -> onNext(4[a, b])

以下は、userIds のリストと、2 つの情報要求ごとの実際の例です。実行すると、失敗することがわかります(ソースコードの下を参照)

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.mergeDelayError(info1, info2);
            }
        });

        t.subscribe(new Action1<String>() {

            public void call(String t1) {
                System.out.println(t1);
            }
        }, new Action1<Throwable>() {

            public void call(Throwable t1) {
                t1.printStackTrace();
            }
        },
        new Action0(){

            public void call() {
                System.out.println("onComplete");
            }

        });
    }
}

出力:

1::: 1
2::: 1
2::: 2
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210)
        at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171)
        at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102)
        at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)

ネストされたサブスクライブ ソリューション:

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        userIdObservable.subscribe(new Action1<String>() {

            public void call(String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ", t1, "3");
                Observable.merge(info1, info2).subscribe(new Action1<String>() {

                    public void call(String t1) {
                        System.out.println(t1);
                    }
                }, new Action1<Throwable>() {

                    public void call(Throwable t1) {
                        t1.printStackTrace();
                    }
                },
                        new Action0() {

                            public void call() {
                                System.out.println("onComplete");
                            }

                        });
            }
        });
    }
}

出力:

1::: 1
2::: 1
onComplete
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 3
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 4
2::: 4
onComplete
1::: 5
2::: 5
onComplete
1::: 6
2::: 6
onComplete

ご覧のとおり、失敗した個々のユーザー ID のみが個々の処理を停止しましたが、残りのユーザー ID は処理されました。

アドバイスを探しているだけで、この解決策が理にかなっているかどうか、そうでない場合はベストプラクティスが何であるかを確認してください.

ありがとう、アレックス

4

4 に答える 4

18

エラーを無視したいので、 を試すことができますonErrorResumeNext(Observable.<String>empty());。例えば、

Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty());
Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty());
return Observable.merge(info1, info2);
于 2014-03-12T02:36:30.223 に答える
9

ベスト プラクティスは、複数の Observable を 1 つに結合するmergeDelayError( )を使用して、エラーを伝播する前にエラーのない Observable を続行できるようにすることです。

mergeDelayErrorのように動作しますmerge。例外は、マージされている Observable の 1 つが onError 通知で終了した場合です。これがマージで発生した場合、マージされた Observable はすぐに onError 通知を発行して終了します。一方、mergeDelayError は、他のエラーを生成しない Observables に、それらのアイテムの発行を終了する機会をマージしているという機会を与えるまで、エラーの報告を保留します。マージされた他のすべての Observable が終了したときの onError 通知。

于 2015-11-24T09:31:42.137 に答える
1

Rx初心者として、例外を個別に処理し、次のイベントの処理を続行するための簡単な答えも探していましたが、@Daniele Segatoが求めていたことに対する答えを見つけることができませんでした. 制御できない場合の解決策の 1 つを次に示します。

上記の例では、オブザーバブルを制御できることを前提としています。つまり、1 つの方法は、mergeDelayError を使用してエラーを最後まで遅らせるか、マージを使用してすべてのイベントの既知の空のイベント Observable を Observable として個別に返すことです。

ソース イベント エラーの場合は、lift を使用して、基本的に現在の Observable の値を適切に処理する別の Observable を作成できます。SimpleErrorEmitter クラスは、時々失敗する可能性のある無制限のストリームをシミュレートします。

Observable.create(new SimpleErrorEmitter())
        // transform errors to write to error stream
        .lift(new SuppressError<Integer>(System.err::println))
        .doOnNext(System.out::println)  // and everything else to console
        .subscribe();


class SimpleErrorEmitter implements OnSubscribe<Integer> {
@Override
public void call(Subscriber<? super Integer> subscriber) {
    subscriber.onNext(1);
    subscriber.onNext(2);

    subscriber.onError(new FooException());

    subscriber.onNext(3);
    subscriber.onNext(4);

    subscriber.onCompleted();
}

class SuppressError<T> implements Operator<T, T> {
final Action1<Throwable> onError;
public SuppressError(Action1<Throwable> onError) {
    this.onError = onError;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> t1) {
    return new Subscriber<T>(t1) {
        @Override
        public void onNext(T t) {
            t1.onNext(t);
        }
        @Override
        public void onError(Throwable e) { // handle errors using a separate function
            onError.call(e);
        }
        @Override
        public void onCompleted() {
            t1.onCompleted();
        }
    };
}

正常に試行/キャッチして続行できる加入者処理エラーの場合

    Observable<Integer> justInts = justStrs.map((str) -> {
        try {
            return Integer.parseInt(str);
        } catch (NumberFormatException e) {
            return null;
        }
    });

私はまだ失敗したイベントを再試行または遅延して次から続行する簡単な方法を見つけようとしています。

    Observable<String> justStrs = Observable
            .just("1", "2", "three", "4", "5")  // or an unbounded stream
            // both these retrying from beginning 
            // when you delay or retry, if they are of known exception type
            .retryWhen(ex -> ex.flatMap(eachex -> {
                // for example, if it is a socket or timeout type of exception, try delaying it or retrying it
                if (eachex instanceof RuntimeException) {
                    return Observable.timer(1L, TimeUnit.MICROSECONDS, Schedulers.immediate());
                }
                return Observable.error(eachex);
            }))
            // or simply retry 2 times
            .retry(2) // if it is the source problem, attempt retry
            .doOnError((ex) -> System.err.println("On Error:" + ex));

参考:https ://groups.google.com/forum/#!topic/rxjava/trm2n6S4FSc

于 2016-10-28T08:04:36.027 に答える
1

Observable.flatMapのソースを見る:

return merge(map(func));

考えられるすべてのユーザー ID を処理したい場合は、flatMap の修正版を使用できます。

Observable.mergeDelayError(userIdObservable.map(userInfoFunc))

さらに、あなたが言うなら:

ユーザー ID に対するネットワーク要求のいずれかが失敗した場合、そのユーザー ID は更新されず、スキップできます。

次に使用しないでください:

return Observable.mergeDelayError(info1, info2);

これにより、一方が失敗した場合でも、info1 と info2 の両方が要求されるためです。

むしろ:

return Observable.merge(info1, info2);

info1 と info2 が同じスレッドにサブスクライブされている場合、それらは順次実行されるため、info1 が失敗した場合、info2 は要求されません。info1 と info2 は I/O バウンドであるため、これらを並行して実行する必要があると仮定します。

getInfo("1::: ", t1, "2").subscribeOn(Schedulers.io());
getInfo("2::: ",t1, "3").subscribeOn(Schedulers.io());

これにより、処理が大幅に高速化されます

コード全体:

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        return Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        })
        .subscribeOn(Schedulers.io());
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = Observable.mergeDelayError(userIdObservable.map(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.merge(info1, info2);
            }
        }));
        //rest is the same
    }
}
于 2017-01-05T15:05:55.250 に答える