3

Observable次のものがあるとしましょうrxjava-scala-0.18.4

@volatile var dorun = true
var subscriber: Subscriber[String] = null
val myObs = Observable { obs: Subscriber[String] =>
  subscriber = obs
  Subscription { println("unsubscribed"); dorun = false }
}

val sub = myObs.head.subscribe(println(_))

assertTrue(dorun)

subscriber.onNext("hello")
Thread.sleep(500)
assertFalse(dorun)

subscriber.onNext("world")
Thread.sleep(500)
assertFalse(dorun)

2 番目のアサーションは失敗します。つまり、headサブスクライブは解除されません。Observables の私の理解は間違っていますか、それともhead最初の要素が発行された後に登録を解除する必要がありますか?

4

1 に答える 1

3

メソッドを見てください:が に設定されるsubscribe()までループしますが、それを行う唯一の方法は、サブスクリプションを閉じることです。問題は、まだ誰もサブスクリプションを持っていないことです: ループはあなたが戻ることを妨げます. サブスクライブがまだ終了していないため、最初のアイテムが配信された後、オペレーターは基になるサブスクリプションを終了できません。したがって、永遠にループし続けます。runfalsehead

1 つの解決策は、ループを にスケジュールされたアクションに移動することSchedulers.trampoline()です。その後、 から戻ってからしばらくしてからイベントが配信されsubscribe()ます。

さらに、あなたのメソッドでは、次のように、渡される subscribe()新しいサブスクリプション オブジェクトを追加する必要があるようです。Subscriber

val myObs = Observable {
    obs: rx.lang.scala.Subscriber[String] =>
        ...

        obs.add(
            Subscription {
                dorun = false
                println("unsubscribed")
            }
        )
}
于 2014-06-09T17:35:14.200 に答える