サブジェクトからアイテムを放出するオブザーバブルを再帰的に定義しようとしています。または、一定の時間が経過した場合はデフォルト値です。この場合、タイマーのデフォルト値ゼロを使用しています。私は RxScala を使用しており、次のコードから始めています。
val s = PublishSubject[Int]()
def o: Observable[Unit] = {
val timeout = Observable.timer(1 second)
Observable.amb(s, timeout)
.first
.concatMap((v) => {
println(v)
o
})
}
ComputationScheduler().createWorker.schedule {
var value = 0
def loop(): Unit = {
Thread.sleep(5000)
s.onNext(value + 1)
value += 1
loop()
}
loop()
}
o.toBlocking.last
これは機能するはずですが、出力がわかりにくいです。0 の他のすべてのシーケンスには、予想される 4 つではなく 2 つが含まれます。2 つのゼロが発行され、残りの 3 秒が経過しますが、出力はありません。
0
0
0
0
1
0
0
2
0
0
0
0
3
0
0
4
0
0
0
0
5
0
0
6
0
0
0
0
7
0
0
8