ブロッキング IO 呼び出し (HTTP 呼び出しなど) を行うチェーンがあります。ブロッキング呼び出しで値を消費し、中断せずに続行しますが、その間に積み重なったものはすべて破棄し、同じ方法で次の値を消費します。
次の例を検討してください。
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
Thread.sleep(1000)
it
}.blockingForEach { println(it) }
}
単純な観点からは、 のようなものを出力することを期待します0, 10, 20, ...
が、 を出力し0, 1, 2, ...
ます。
私は何を間違っていますか?
編集:
debounce
受信ストリームを食い尽くすために素朴に追加することを考えました:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.debounce(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
しかし、今、私はjava.lang.InterruptedException: sleep interrupted
.
編集:
動作しているように見えるのは次のとおりです。
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
出来上がりは期待通り0, 10, 20, ...
!!
それは正しい方法ですか?
throttleLast
Computation-Scheduler に切り替えることに注意しました。元のスケジューラに戻す方法はありますか?
編集:
私はまたjava.lang.InterruptedException: sleep interrupted
、その変種で時折得ます。