1

サブジェクトからアイテムを放出するオブザーバブルを再帰的に定義しようとしています。または、一定の時間が経過した場合はデフォルト値です。この場合、タイマーのデフォルト値ゼロを使用しています。私は RxScala を使用しており、次のコードから始めています。

val s = PublishSubject[Int]()

def o: Observable[Unit] = {
  val timeout = Observable.timer(1 second)
  Observable.amb(s, timeout)
    .first
    .concatMap((v) => {
      println(v)
      o
    })
}

ComputationScheduler().createWorker.schedule {
  var value = 0
  def loop(): Unit = {
    Thread.sleep(5000)
    s.onNext(value + 1)
    value += 1
    loop()
  }
  loop()
}

o.toBlocking.last

これは機能するはずですが、出力がわかりにくいです。0 の他のすべてのシーケンスには、予想される 4 つではなく 2 つが含まれます。2 つのゼロが発行され、残りの 3 秒が経過しますが、出力はありません。

0
0
0
0
1
0
0
2
0
0
0
0
3
0
0
4
0
0
0
0
5
0
0
6
0
0
0
0
7
0
0
8
4

2 に答える 2

0

コードを次のようにリファクタリングすると、(私のマシンで) 期待される結果が生成されます。

object Test {
  def main(args: Array[String]) {
    val s = PublishSubject[Int]()

    val timeout = Observable.timer(1 second)

    def o: Observable[Unit] = {
      Observable.amb(s, timeout).first
        .concatMap((v) => {
          println(v)
          o
        })
    }

    var value = 0
    NewThreadScheduler().createWorker.scheduleRec {
      Thread.sleep(5000)
      value += 1
      s.onNext(value)
    }

    o.toBlocking.last
  }
}

NewThreadScheduler への切り替えと、手動の再帰的スケジューリングではなく、scheduleRec メソッドの使用に注意してください。

于 2015-03-19T12:24:08.847 に答える
0

これは確かに不可解です!だからここに理論があります:

  1. 実際、コードは 5 秒ごとに 5 秒ではなく 4 ティックを生成しています。
  2. 4 番目に競合状態があり、1 つは最初にタイムアウトによって勝ち、次にワーカー、次にタイムアウトなどで勝ちました。

したがって、シーケンスが 00001 002 00003 である代わりに... 0000 1002 0000 として見てください...

したがって、ここで2つの別々の問題が発生する可能性があり、いじらないと多くのことはできませんが、試すことができることは次のとおりです。

  1. シリアル番号も o() に追加して、どのタイムアウトがレースに勝っていないかを確認できます。
  2. 値を 1 秒と 5 秒から、1.5 秒と 5 秒のように互いに倍数ではない値に変更します。これにより、1 つの問題を取り除き、他の問題に集中することができます。
  3. 毎秒「----」を印刷する外部の無関係なワーカーを配置します。0.3 秒程度後に開始します。分岐点がどこにあるのか、よりよくわかるかもしれません。
于 2015-03-18T20:22:54.943 に答える