1

リモート akka ActorSystemのログ ストリームをサブスクライブしようとしています。基本的には、リモート アクターの実行中のログを表示するコンソールを作成します。

これを行う唯一の方法は次のとおりです。ロギング ActorSystem 内で Actor を作成し、その Actor を ActorSystem.eventStream にサブスクライブさせてから、コンソールの ActorSystem 内から actorSelectionを使用してその Actor にサブスクライブします。

しかし、ログ パイプラインは次のようになるため、これは非常に「間接的」に見えます。

logging Actor --> eventStream --> Actor subscribed to eventStream --> local Actor

イベント ストリームをサブスクライブする簡単な方法はありますか?

4

1 に答える 1

2

単純化の観点から、アクターを追加せずにリモート アクターをイベント ストリームにサブスクライブすることを禁止するものは何もありません。Akkaのドキュメントには次のように記載されています。

イベント ストリームはローカル機能です。つまり、クラスター化された環境内の他のノードにイベントを配信しません (明示的にストリームにリモート アクターをサブスクライブしない限り)。Akka クラスターでイベントをブロードキャストする必要がある場合、受信者を明示的に知らずに (つまり、ActorRef を取得する)、以下を検討することをお勧めします。

説明のために、サブスクライブするリモート システムに対応する次のコード フラグメントを考えてみましょう。

  class PublisherActor extends Actor with ActorLogging { // example publisher actor just to generate some logs
    context.system.scheduler.schedule(1.second, 1.second, self, "echo")
    def receive = {
      case "echo" ⇒
        val x = Random.nextInt(100)
        log.info(s"I got a random number: $x")
    }
  }

  def runPublisher() = {
    println("=== running publisher node ===")
    val system = ActorSystem("PublisherSystem")
    val selection = system.actorSelection("akka.tcp://SubscriberSystem@127.0.0.1:2553/user/subscriber")
    selection.resolveOne(10.seconds) onSuccess { // when the listener actor is available,
      case listener ⇒ system.eventStream.subscribe(listener, classOf[LogEvent]) // subscribe it to the event stream
    }
    val publisher = system.actorOf(Props[PublisherActor], "publisher") // some example publisher
  }

次に、ログを表示する「ローカル」ノードの対応するサブスクライバー:

  class SubscriberActor extends Actor with ActorLogging {
    log.info("subscriber listening...")
    def receive = {
      case msg ⇒ log.info(s"Got: $msg")
    }
  }

  def runSubscriber() = {
    println("=== running subscriber node ===")
    val system = ActorSystem("SubscriberSystem")
    val listener = system.actorOf(Props[SubscriberActor], "subscriber")
  }

ただし、このソリューションにはいくつかの注意点があります。パブリッシャーはサブスクライバーより前に実行する必要がある (または、サブスクライバーがパブリッシャーが起動するまで何らかの再試行ポリシーを実装する) こと、場所がハードコードされていることなどです。より堅牢で回復力のあるシステムが必要であり、それが許容される場合は、ドキュメントのアドバイスに従い、クラスタ化された環境で分散パブリッシャー/サブスクライバーを使用してください。

それが役に立ったことを願っています!

于 2015-06-26T23:24:28.733 に答える