ここでの問題は、ここで RxJava と RxScala のコードを混在させることです。ご覧のとおり、RxScala は RxJava 機能の単なるラッパーです。前者は後者に転送するだけで、「実際の」実装はありません。これは、2 つ以上のバージョンではなく 1 つのバージョンのみを維持する必要があるため便利です。
scheduler
あなたの例のタイプはrx.Scheduler
であるため、 RxJavaScheduler
です。ただし、RxScala であるsubscribeOn
を提供する必要があります。したがって、RxJava を RxScala のものに変換する必要があります。rx.lang.scala.Scheduler
Scheduler
Scheduler
ただし、あなたの場合、より良い方法があります。ファクトリメソッドを使用してExecutors.newSingleThreadExecutor
ラップします。次に、これを にラップすると、 で使用できるスケジューラが得られます。コードは次のようになります (どのスレッドで実行されているかを確認するための print ステートメントを含めました)。scala.concurrent.ExecutionContext
fromExecutor
rx.lang.scala.schedulers.ExecutionContextScheduler
subscribeOn
val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)
Observable.just(1, 2, 3)
.subscribeOn(s)
.doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
.subscribe()