2

「onNext」で例外が発生した後、さらに「onError」が呼び出された後でも、rxjavaが監視可能なシーケンスから次の要素を消費する理由を誰かが説明できますか?

ここに私のシミュレーションがあります:

import java.util.concurrent.{TimeUnit, CountDownLatch}
import rx.lang.scala._

object Tests {
  val counter = new CountDownLatch(1)

  def buildStream(num: Int) = {
    Stream.range(1, num)
      .map {s =>
      println(s"[${Thread.currentThread().getId}] Taken: $s");
      s}
  }
  val stream = buildStream(10).toSeq
  stream.toObservable
    .subscribe(
      onNext = x => consume(x),
      onError = e => println(s"ERROR! $e"),
      onCompleted = () => {println("completed"); counter.countDown()}
    )

 def consume(x: Int) = {
    println(s"[${Thread.currentThread().getId}] consuming: $x")
    Thread.sleep(100)
    if (x == 5) {
      throw new Exception(s"[${Thread.currentThread().getId}] consume?! ha!")
    }
  }
  counter.await(10, TimeUnit.SECONDS)
}

その結果は次のとおりです。例外が発生した後、項目#6がシーケンスから取得されていることがわかります。

[39] consuming: 1
[39] Taken: 2
[39] consuming: 2
[39] Taken: 3
[39] consuming: 3
[39] Taken: 4
[39] consuming: 4
[39] Taken: 5
[39] consuming: 5
ERROR! java.lang.Exception: [39] consume?! ha!
[39] Taken: 6
res0: rx.lang.scala.Subscription

私が理解できる限り、通常のシーケンスから作成された監視可能なコレクションは「コールド」であり、現在のアイテムがオブザーバーによって正常に処理された後にのみ、次のアイテムをソース シーケンスから取得する必要があります。スレッドの問題が疑われる可能性がありますが、「コールド」は真実ではない「コールド」ではないことを意味し、同じスレッド ID が常に使用されていることも明確にわかります。

では、なぜアイテム6がシーケンスから取られるのでしょうか?!

4

1 に答える 1

0

あなたObservableは同期されています。Observableつまり、が新しい要素を生成するObserverのを防ぐことはできません。Observableただし、RxJavaは、またはObserverの後にメッセージを受信しないことを約束します。それはあなたの例と一致しています。onErroronCompleted

次の 0.17.0 では、RxJava がこの問題を解決します。ここで議論を確認できます: https://github.com/Netflix/RxJava/issues/802

于 2014-03-04T06:09:45.547 に答える