1

Interval 演算子を使用していますが、パイプラインで例外が発生した場合でもアイテムを発行し続けたいと考えています。

そこでonErrorResumeNext、例外の場合にアイテムを発行することを試みます。しかし、このアイテムを発行した後、間隔がそれ以上のアイテムの発行を停止することがわかりました。

ここで私の単体テスト。

@Test
public void testIntervalObservableWithError() {
    Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
            .map(time -> "item\n")
            .map(item -> item = null)
            .map(String::toString)
            .onErrorResumeNext(t-> Observable.just("item with error emitted"))
            .subscribe(System.out::print, t->{
                        System.out.println(t);
                    }
                   );
    TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription);
    testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS);
}

私はこの振る舞いに混乱しています。onErrorResumeNext

解決:

いくつかの説明の後、エラーが発生すると、観測可能な t´s が完了していることに気付きました。そのため、例外を持つ可能性のあるオブザーバブルを別のオブザーバブルにラップすることになり、flatMap を使用しています。そのため、メインの Observable はアイテムを放出し続けます。

@Test
public void testIntervalObservableWithError() {
    Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(time -> "item\n")
            .flatMap(item -> Observable.just(item)
                    .map(String::toString))
            .subscribe(System.out::print); 
    TestSubscriber testSubscriber = new TestSubscriber();
    testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}

そのような魔法をすべて実行できるオペレーターがいる場合、私は知りたい.

リグラード

4

3 に答える 3

1

RxJava ストリームで使用されるコントラクトは、エラーが発行されると、それ以上アイテムを発行してはならないというものです。ユース ケースでエラー後もストリームを継続する必要がある場合は、エラーをonNextエミッションにも変換する必要があります。ラッパータイプを作成し、次ValueOrError<T>の観点から考え始めObservable<ValueOrError<T>>ます。

Observable<Integer> source = ...
Observable<ValueOrError<Integer>> o = 
  source.map(x -> {
    try { 
      return new ValueOrError<>(mightThrow(x)); 
    }
    catch (Throwable e) {
      return new ValueOrError<>(e);
    });
于 2016-11-17T23:25:51.330 に答える
1

onErrorResumeNextがトリガーされたときに、アップストリームが既にエラーで終了しているため、サブスクリプションが中断されます。そして、例外を下流に送る代わりに、アイテムを発行するだけです。アップストリームを存続させるには、例外がスローされるのを防ぐ必要があります。

特定の例のソリューションは次のようになります。

...
    .map(time -> "item\n")
    .map(item -> item = null)
    .map(item -> {
        try {
            return item.toString();
        } catch (NullPointerException e) {
            return "item with error emitted";
    })
    //no onErrorResumeNext()
    .subscribe ...

onErrorResumeNexterror を項目に置き換えて を呼び出すだけonCompleteです。

于 2016-11-18T00:57:49.730 に答える