0

ブロッキング IO 呼び出し (HTTP 呼び出しなど) を行うチェーンがあります。ブロッキング呼び出しで値を消費し、中断せずに続行しますが、その間に積み重なったものはすべて破棄し、同じ方法で次の値を消費します。

次の例を検討してください。

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
    Thread.sleep(1000)
    it
  }.blockingForEach { println(it) }
}

単純な観点からは、 のようなものを出力することを期待します0, 10, 20, ...が、 を出力し0, 1, 2, ...ます。

私は何を間違っていますか?

編集:

debounce受信ストリームを食い尽くすために素朴に追加することを考えました:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .debounce(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

しかし、今、私はjava.lang.InterruptedException: sleep interrupted.

編集:

動作しているように見えるのは次のとおりです。

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .throttleLast(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

出来上がりは期待通り0, 10, 20, ...!!

それは正しい方法ですか?

throttleLastComputation-Scheduler に切り替えることに注意しました。元のスケジューラに戻す方法はありますか?

編集:

私はまたjava.lang.InterruptedException: sleep interrupted、その変種で時折得ます。

4

1 に答える 1