1

私のリアクティブ プログラムを rxscala でテストするには、次のようなものを作成する必要がありますObservable

val numberStream: Observable[Int] = Observable.???()

どれの

  1. 発行番号1
  2. それから待つ1s
  3. 発行番号4
  4. それから待つ3s
  5. 発行番号2
  6. それから待つ2s

私は醜い解決策を持っていますThreadReplaySubject

val subject: Subject[Int] = ReplaySubject()
val numberStream: Observable = subject

new Thread(new Runnable {
    def run = {
        subject.onNext(1)
        Thread.sleep(1000)
        subject.onNext(4)
        Thread.sleep(3000)
        subject.onNext(2)
        Thread.sleep(2000)
    }
}).start()

より良い解決策はありますか?

4

1 に答える 1

1

いくつかの Observable を遅延で連結し、最終的な Observable をemptyサブスクリプション遅延で連結できます。

val numberStream = (
     Observable.just(1) ++ 
     Observable.just(4).delay(1.second) ++
     Observable.just(2).delay(3.second) ++
     Observable.empty.delaySubscription(2.second))

補足として、テストしている場合はTestScheduler、 の 2 番目の引数として渡すことができるを使用する必要がありますdelay

于 2015-11-13T16:29:06.033 に答える