観測可能なストリームを単独でスライスしようとしています。例:
val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)
result.subscribe((buf) => println(buf.toString))
出力は次のとおりです。
Buffer()
Buffer()
Buffer()
Buffer()
source
boundaries
に到達する前に、おそらくオンラインで反復されるresult
ため、境界と結果のバッファーのみを作成しますが、埋めるものは何もありません。
これに対する私のアプローチはpublish
/を使用していconnect
ます:
val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)
result2.subscribe((buf) => println(buf.toString))
source2.connect
これにより、問題なく出力が生成されます。
Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)
今、私connect
は外の世界から隠す必要があり、connect
それresult
がサブスクライブされたときに(クラス内でこれを行っており、公開したくありません)。何かのようなもの:
val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
.tumblingBuffer(boundaries3)
.doOnSubscribe(() => source3.connect)
result3.subscribe((buf) => println(buf.toString))
しかし今、doOnSubscribe
アクションは決して呼び出されないので、公開されたものsource
は決して接続されません...
どうしたの?