2

ホットとコールドの観測量の間の概念を理解しようとしており、Monifu ライブラリを試しています。私の理解では、次のコードでは、サブスクライバーの 1 つだけが Observable によって発行されたイベントを取得することになりますが、そうではありません!

scala> :paste
// Entering paste mode (ctrl-D to finish)

import monifu.reactive._
import scala.concurrent.duration._

import monifu.concurrent.Implicits.globalScheduler

val obs = Observable.interval(1.second).take(10)

val x = obs.foreach(a => println(s"from x ${a}"))
val y = obs.foreach(a => println(s"from y ${a}"))

// Exiting paste mode, now interpreting.

from x 0
from y 0
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
obs: monifu.reactive.Observable[Long] = monifu.reactive.Observable$$anon$5@2c3c615d
x: Unit = ()
y: Unit = ()

scala> from x 1
from y 1
from x 2
from y 2
from x 3
from y 3
from x 4
from y 4
from x 5
from y 5
from x 6
from y 6
from x 7
from y 7
from x 8
from y 8
from x 9
from y 9

では、これは Observable が関心のあるすべてのサブスクライバーにイベントを発行しているように見えますか?

4

1 に答える 1

2

私はモニフの主な作者です。

コールドオブザーバブルとは、そのサブスクライブ機能が各サブスクライバーに対して (各subscribe()呼び出しで) 新しいデータ ソースを開始することを意味しますが、ホットオブザーバブルとは、複数のサブスクライバー間で同じデータ ソースを共有することです。

例として、ファイルがデータ ソースであるとします。ファイルから行を発行する単純な Observable をモデル化しましょう。

def fromFile(file: File): Observable[String] = {
  // this is the subscribe function that
  // we are passing to create ;-)
  Observable.create { subscriber =>
    // executing things on our thread-pool
    subscriber.scheduler.execute {
      val source = try {
        Observable.fromIterable(scala.io.Source
          .fromFile(file).getLines().toIterable)
      } 
      catch {
        // subscribe functions must be protected
        case NonFatal(ex) =>
          Observable.error(ex)
      }

      source.unsafeSubscribe(subscriber)
    }
  }
}

この関数は、コールド オブザーバブルを作成します。つまり、サブスクライブしたオブザーバーごとに新しいファイル ハンドルを開き、サブスクライブした各オブザーバーの行を読み取って出力します。

しかし、それをホット オブザーバブルに変えることができます。

// NOTE: publish() turns a cold observable into a hot one
val hotObservable = fromFile(file).publish()

そして、あなたがこれを行うときの違いは次のとおりです。

val x = observable.subscribe()
val y = observable.subscribe()

オブザーバブルがホットな場合:

  1. connect()オブザーバブルは、呼び出されるまで何もしません
  2. の後connect()、同じファイルが開かれ、両方がまったく同じイベントを受け取ります
  3. そのファイルからすべての行が発行された後、(共有) データソースが既に使い果たされているため、新しいサブスクライバーは何も取得しません。

観測対象が冷たい場合:

  1. サブスクライブごとに、新しいファイル ハンドルが開かれ、読み取られます。
  2. 要素は の直後に発行されるsubscribe()ため、connect()
  3. サブスクライブしているすべてのオブザーバーは、その時点に関係なく、そのファイルからすべての行を受け取ります。

Monifu にも適用されるいくつかの参照:

  1. RxJava の wiki から接続可能
  2. Rx の概要: ホット オブザーバブルとコールド オブザーバブル
  3. RxJava の wiki の件名
于 2015-07-30T12:13:53.523 に答える