7

RX Java の scala バインディングをしばらく使用しており、これを Akka Actors と組み合わせることを考えています。Akka 間でRX を渡すことが安全/可能かどうかを知りたいです。ObservableActorたとえば、20 までの偶数の平方を (毎秒) 出力するプログラム:

/* producer creates an observable and sends it to the worker */
object Producer extends Actor {
  val toTwenty : Observable[Int] = Observable.interval(1 second).take(20)

  def receive = {
    case o : Observable[Int] =>
      o.subscribe( onNext => println )
  }

  worker ! toTwenty
}


/* worker which returns squares of even numbers */
object Worker extends Actor {
  def receive = {
    case o : Observable[Int] => 
       sender ! o filter { _ % 2 == 0 } map { _^2 }
  }
}

(これは疑似コードとして扱ってください。コンパイルされません)。sendあるアクターから別のアクターに Observables を送信していることに注意してください。理解したい:

  • Akka と RX は Observable へのアクセスを自動的に同期しますか?
  • を分散システム経由で送信することはObservableできません。ローカル メモリ内のオブジェクトへの参照です。しかし、それはローカルで機能しますか?
  • この簡単な例で、 のsubscribeコールで作業がスケジュールされるとしProducerます。作業を分割して、各アクターで別々に行うことはできますか?

余談: RX と Actors を組み合わせようとするプロジェクトをいくつか見てきました。

http://jmhofer.johoop.de/?p=507および https://github.com/jmhofer/rxjava-akka

Observableしかし、これらは単にアクター間のメッセージとして を渡すだけではないという点で異なります。彼らは最初に値を取得するために呼び出しsubscribe()、次にこれらをアクターのメールボックスに送信し、これから新しい値を作成しますObservable。それとも私は間違っていますか?

4

3 に答える 3

10

あなたのアプローチは良い考えではありません。Akka の背後にある主なアイデアは、メッセージがアクターのメールボックスに送信され、アクターがメッセージを (1 つのスレッドで) 順次処理するというものです。この方法では、2 つのスレッドがアクターの状態にアクセスすることはできず、同時実行の問題は発生しません。

あなたの場合、Observable で subscribe を使用します。onNext コールバックは別のスレッドで実行される可能性があります。したがって、突然、2 つのスレッドがアクターの状態にアクセスできる可能性があります。したがって、コールバック内で何をするかについては、非常に注意する必要があります。これが、他の実装を最後に観察した理由です。これらの実装は、onNext 内の値を取得し、この値をメッセージとして送信するようです。このようなコールバック内でアクターの内部状態を変更してはなりません。代わりに、同じアクターにメッセージを送信してください。このようにして、1 つのスレッドでの順次処理が再び保証されます。

于 2014-03-01T18:27:15.283 に答える
5

Observable時間をかけて実験したところ、Akka で s を使用できることがわかりました。実際、Observableは の多変量拡張と考えることができるのでFuture、Actors と Futures を組み合わせるのと同じガイドラインに従うことができます。実際、Akka での の使用は、公式ドキュメントと教科書 (例: Akka Concurrency、Wyatt 2013)Futureの両方でサポート/推奨されていますが、多くの注意事項があります。

まずポジティブ:

  • Observables, like Futures は不変であるため、理論的にはメッセージで安全に受け渡せるはずです。
  • Observableのように、実行コンテキストを指定できますFuture。これは を使用して行われObservable.observeOn(scheduler)ます。system.dispatcherAkka ディスパッチャー (または などcontext.dispatcher) をrx.lang.scala.ExecutorSchedulerコンストラクターに渡すことで、Akka の exec コンテキストからスケジューラーを作成できます。これにより、それらが同期されていることが保証されます。
  • 上記に関連して、オブザーバブルのスケジューラーを暗黙的に指定できるrx-scala の機能強化 ( https://github.com/Netflix/RxJava/issues/815#issuecomment-38793433 ) があります。
  • ask先物は、このパターンで Akka にうまく適合します。Observables にも同様のパターンを使用できます (この投稿の下部を参照)。これにより、リモート オブザーバブルにメッセージを送信する際の問題も解決されます。

ここで注意事項:

  • 彼らは未来と同じ問題を共有しています。たとえば、ページの下部を参照してください: http://doc.akka.io/docs/akka/2.3.2/general/jmm.html。また、Wyatt 2013 の先物に関する章。
  • @mavilein の回答のように、これは、Observable.subscribe()内部状態にアクセスするために Actor を囲むスコープを使用してはならないことを意味します。たとえばsender、サブスクリプションを呼び出すべきではありません。代わりに、次の例のように、値を val に格納してから、この val にアクセスします。
  • Akka が使用するスケジューラーの解像度は Rx とは異なります。デフォルトの解像度は 100 ミリ秒です (Wyatt 2013)。これがどのような問題を引き起こす可能性があるかを経験したことがある人は、以下にコメントしてください!

ask最後に、 Observablesのパターンに相当するものを実装しました。toObservableorを使用して Observable を非同期的に返し、一時的なアクターと舞台裏??に支えられています。PublishSubjectソースによって送信されたメッセージはタイプrx.lang.scala.Notificationusingであるため、監視可能なコントラクトの完了状態とエラーmaterialize()状態を満たしていることに注意してください。そうしないと、これらの状態をシンクに通知する方法がありません。ただし、任意の種類のメッセージを送信することを妨げるものは何もありません。これらは単に を呼び出します。オブザーバブルには、メッセージが特定の間隔で受信されない場合にタイムアウト例外で停止するタイムアウトがあります。onNext()

次のように使用されます。

import akka.pattern.RX
implicit val timeout = akka.util.Timeout(10 seconds)
case object Req

val system = ActorSystem("test")
val source = system.actorOf(Props[Source],"thesource")

class Source() extends Actor {
  def receive : Receive = {
     case Req =>
       val s = sender()
       Observable.interval(1 second).take(5).materialize.subscribe{s ! _}
  }
}

val obs = source ?? Req
obs.observeOn(rx.lang.scala.schedulers.ExecutorScheduler(system.dispatcher)).subscribe((l : Any) => println ("onnext : " + l.toString),
              (error : Throwable) => { error.printStackTrace ; system.shutdown() },
              () => { println("completed, shutting system down"); system.shutdown() })

そして、この出力を生成します:

onnext : 0
onnext : 1
onnext : 2
onnext : 3
onnext : 4
completed, shutting system down

ソースは以下。これは、AskSupport.scala の修正版です。

package akka.pattern

/*
 * File : RxSupport.scala
 * This package is a modified version of 'AskSupport' to provide methods to 
 * support RX Observables.
 */

import rx.lang.scala.{Observable,Subject,Notification}
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import akka.actor._
import scala.concurrent.ExecutionContext
import akka.util.Unsafe
import scala.annotation.tailrec
import akka.dispatch.sysmsg._

class RxTimeoutException(message: String, cause: Throwable) extends TimeoutException(message) {
  def this(message: String) = this(message, null: Throwable)
  override def getCause(): Throwable = cause
}

trait RxSupport {
  implicit def toRx(actorRef : ActorRef) : RxActorRef = new RxActorRef(actorRef)
  def toObservable(actorRef : ActorRef, message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef ?? message
  implicit def toRx(actorSelection : ActorSelection) : RxActorSelection = new RxActorSelection(actorSelection)
  def toObservable(actorSelection : ActorSelection, message : Any)(implicit timeout : Timeout): Observable[Any] = actorSelection ?? message
}

final class RxActorRef(val actorRef : ActorRef) extends AnyVal {
  def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorRef match {
    case ref : InternalActorRef if ref.isTerminated =>
      actorRef ! message
      Observable.error(new RxTimeoutException(s"Recepient[$actorRef] has alrady been terminated."))
    case ref : InternalActorRef =>
      if (timeout.duration.length <= 0)
        Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorRef]"))
      else {
        val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorRef.toString)
        actorRef.tell(message, a)
        a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
      }
  }
  def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}

final class RxActorSelection(val actorSel : ActorSelection) extends AnyVal {
  def toObservable(message : Any)(implicit timeout : Timeout) : Observable[Any] = actorSel.anchor match {
    case ref : InternalActorRef =>
      if (timeout.duration.length <= 0)
        Observable.error(new IllegalArgumentException(s"Timeout length must not be negative, message not sent to [$actorSel]"))
      else {
        val a = RxSubjectActorRef(ref.provider, timeout, targetName = actorSel.toString)
        actorSel.tell(message, a)
         a.result.doOnCompleted{a.stop}.timeout(timeout.duration)
      }
    case _ => Observable.error(new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]"))
  }
  def ??(message :Any)(implicit timeout : Timeout) : Observable[Any] = toObservable(message)(timeout)
}


private[akka] final class RxSubjectActorRef private (val provider : ActorRefProvider, val result: Subject[Any]) extends MinimalActorRef {
  import RxSubjectActorRef._
  import AbstractRxActorRef.stateOffset
  import AbstractRxActorRef.watchedByOffset

  /**
   * As an optimization for the common (local) case we only register this RxSubjectActorRef
   * with the provider when the `path` member is actually queried, which happens during
   * serialization (but also during a simple call to `toString`, `equals` or `hashCode`!).
   *
   * Defined states:
   * null                  => started, path not yet created
   * Registering           => currently creating temp path and registering it
   * path: ActorPath       => path is available and was registered
   * StoppedWithPath(path) => stopped, path available
   * Stopped               => stopped, path not yet created
   */
  @volatile
  private[this] var _stateDoNotCallMeDirectly: AnyRef = _

  @volatile
  private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet

  @inline
  private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]]

  @inline
  private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean =
    Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy)

  @tailrec // Returns false if the subject is already completed
  private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match {
    case null => false
    case other => updateWatchedBy(other, other + watcher) || addWatcher(watcher)
  }

  @tailrec
  private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match {
    case null => ()
    case other => if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher)
  }

  @tailrec
  private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match {
    case null => ActorCell.emptyActorRefSet
    case other => if (!updateWatchedBy(other, null)) clearWatchers() else other
  }

  @inline
  private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)

  @inline
  private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
    Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)

  @inline
  private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)

  override def getParent: InternalActorRef = provider.tempContainer

  def internalCallingThreadExecutionContext: ExecutionContext =
    provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext

  /**
   * Contract of this method:
   * Must always return the same ActorPath, which must have
   * been registered if we haven't been stopped yet.
   */
  @tailrec
  def path: ActorPath = state match {
    case null =>
      if (updateState(null, Registering)) {
        var p: ActorPath = null
        try {
          p = provider.tempPath()
          provider.registerTempActor(this, p)
          p
        } finally { setState(p) }
      } else path
    case p: ActorPath       => p
    case StoppedWithPath(p) => p
    case Stopped =>
      // even if we are already stopped we still need to produce a proper path
      updateState(Stopped, StoppedWithPath(provider.tempPath()))
      path
    case Registering => path // spin until registration is completed
  }

  override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
    case Stopped | _: StoppedWithPath => provider.deadLetters ! message
    case _ =>
      if (message == null) throw new InvalidMessageException("Message is null")
      else
        message match {
          case n : Notification[Any] => n.accept(result)
          case other                 => result.onNext(other)
        }
  }

  override def sendSystemMessage(message: SystemMessage): Unit = message match {
    case _: Terminate                      => stop()
    case DeathWatchNotification(a, ec, at) => this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at))
    case Watch(watchee, watcher) =>
      if (watchee == this && watcher != this) {
        if (!addWatcher(watcher))
           // NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
          watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed = true, addressTerminated = false))
      } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
    case Unwatch(watchee, watcher) =>
      if (watchee == this && watcher != this) remWatcher(watcher)
      else System.err.println("BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, this))
    case _ =>
  }

  @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = state match {
    case Stopped | _: StoppedWithPath => true
    case _                            => false
  }

  @tailrec
  override def stop(): Unit = {
    def ensureCompleted(): Unit = {
      result.onError(new ActorKilledException("Stopped"))
      val watchers = clearWatchers()
      if (!watchers.isEmpty) {
        watchers foreach { watcher =>
          // NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS
          watcher.asInstanceOf[InternalActorRef]
            .sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false))
        }
      }
    }
    state match {
      case null => // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
        if (updateState(null, Stopped)) ensureCompleted() else stop()
      case p: ActorPath =>
        if (updateState(p, StoppedWithPath(p))) { try ensureCompleted() finally provider.unregisterTempActor(p) } else stop()
      case Stopped | _: StoppedWithPath => // already stopped
      case Registering                  => stop() // spin until registration is completed before stopping
    }
  }
}

private[akka] object RxSubjectActorRef {
  private case object Registering
  private case object Stopped
  private final case class StoppedWithPath(path : ActorPath)

  def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): RxSubjectActorRef = {
    val result = Subject[Any]()
    new RxSubjectActorRef(provider, result)
    /*timeout logic moved to RxActorRef/Sel*/
  }
}
/*
 * This doesn't work, need to create as a Java class for some reason ...
final object AbstractRxActorRef {
    final val stateOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_stateDoNotCallMeDirectly"))
    final val watchedByOffset = Unsafe.instance.objectFieldOffset(RxSubjectActorRef.getClass.getDeclaredField("_watchedByDoNotCallMeDirectly"))
}*/

package object RX extends RxSupport

2015-09-10 更新

??演算子を実装するための簡単なコードをここに追加すると思います。これは上記とは少し異なります。a) ネットワーク経由のデータをサポートしていない、b) を返すObservable[Observable[A]]ため、応答の同期が容易になります。利点は、Akka の内部を混乱させないことです。

object TypedAskSupport {
  import scala.concurrent.Future
  import akka.actor.{ActorRef,ActorSelection}
  import scala.reflect.ClassTag

  implicit class TypedAskableActorRef(actor : ActorRef) {
    val converted : akka.pattern.AskableActorRef = actor
    def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
      converted.ask(topic).mapTo[Observable[R]]
    def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
      Observable.from (this.?[R](topic)(timeout))
    def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
      converted.ask(topic).asInstanceOf[Future[R]]
   def ??[R](topic : Request[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[R] =
      Observable.from { this.?[R](topic)(timeout) }
  }

  implicit class TypedAskableActorSelection(actor : ActorSelection) {
    val converted : akka.pattern.AskableActorSelection = actor
    def ?[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout) : Future[Observable[R]] =
      converted.ask(topic).mapTo[Observable[R]]
    def ??[R](topic : Subscribe[R])(implicit timeout : akka.util.Timeout, execCtx : scala.concurrent.ExecutionContext) : Observable[Observable[R]] =
      Observable.from (this.?[R](topic)(timeout))
    def ?[R](topic : Request[R])(implicit timeout : akka.util.Timeout) : Future[R] =
      converted.ask(topic).asInstanceOf[Future[R]]
  }
}
于 2014-04-15T11:16:48.380 に答える