0

観測可能なストリームを単独でスライスしようとしています。例:

val source = Observable.from(1 to 10).share
val boundaries = source.filter(_ % 3 == 0)
val result = source.tumblingBuffer(boundaries)

result.subscribe((buf) => println(buf.toString))

出力は次のとおりです。

Buffer()
Buffer()
Buffer()
Buffer()

sourceboundariesに到達する前に、おそらくオンラインで反復されるresultため、境界と結果のバッファーのみを作成しますが、埋めるものは何もありません。

これに対する私のアプローチはpublish/を使用していconnectます:

val source2 = Observable.from(1 to 10).publish
val boundaries2 = source2.filter(_ % 3 == 0)
val result2 = source2.tumblingBuffer(boundaries2)

result2.subscribe((buf) => println(buf.toString))
source2.connect

これにより、問題なく出力が生成されます。

Buffer(1, 2)
Buffer(3, 4, 5)
Buffer(6, 7, 8)
Buffer(9, 10)

今、私connectは外の世界から隠す必要があり、connectそれresultがサブスクライブされたときに(クラス内でこれを行っており、公開したくありません)。何かのようなもの:

val source3 = Observable.from(1 to 10).publish
val boundaries3 = source3.filter(_ % 3 == 0)
val result3 = source3
          .tumblingBuffer(boundaries3)
          .doOnSubscribe(() => source3.connect)

result3.subscribe((buf) => println(buf.toString))

しかし今、doOnSubscribeアクションは決して呼び出されないので、公開されたものsourceは決して接続されません...

どうしたの?

4

1 に答える 1

1

あなたのpublishソリューションは正しい方向に進んでいました。ただし、 type のpublish引数 (ドキュメントを参照) としてラムダを取る代替演算子がありObservable[T] => Observable[R]ます。このラムダの引数は、安全に複数回サブスクライブできる元のストリームです。ラムダ内で、元のストリームを好みに合わせて変換します。あなたの場合、ストリームをフィルタリング、そのフィルターでバッファリングします。

Observable.from(1 to 10)
    .publish(src => src.tumblingBuffer(src.filter(_ % 3 == 0)))
    .subscribe(buf => println(buf.toString()))

connectこの演算子の最も良い点は、後で呼び出す必要がないことです。

于 2016-05-10T06:30:23.557 に答える