2

参考までに、この質問のルーツはScala メソッドのパフォーマンス (collect または foreach またはその他) がソケットを介してループしていますか?

アクター内に websocket への参照を保存し、そのアクターを Akka EventStream にサブスクライブしています。

val socketActor = system.actorOf(Props(new Actor {
  val socket = WebSocketConnection

  def receive = {
    case d: AppMessage ⇒ socket.send(d)
  }
}))
system.eventStream.subscribe(socketActor, classOf[AppMessage])

私を悩ませているのは、EventStream で作成できる唯一の分類子がクラス型であることです。メッセージをさまざまなアクターにルーティングする場合、たとえば userId に基づいて、複数の EventStream を作成し、EventBus を手動で構築する必要がありますか、それとも何か不足していますか?

次のような簡単なことができればいいのですが。

system.eventStream.subscribe(socketActor, Map("userId" -> userId, "teamId" -> teamId) )

EventStream が何を表しているのかよくわからないので、これは単に概念上の問題かもしれません。

4

2 に答える 2

3

ActorEventBusこれは、この Gist に基づく私のソリューションでした: https://gist.github.com/3757237

これは、EventStream を扱うよりもはるかに保守しやすいことがわかりました。将来的には複数の EventStream が必要になるかもしれませんが、現時点では、現在のインフラストラクチャを容易にサポートできます。

メッセージバス

まず、PubSub チャネルに基づいて、アクターでラップされたソケットへの送信メッセージを処理する MessageBus :

case class MessageEvent(val channel:String, val message:String)

/**
 * message bus to route messages to their appropriate contexts
 */
class MessageBus extends ActorEventBus with LookupClassification {

    type Event = MessageEvent
  type Classifier = String

  protected def mapSize(): Int = {
    10
  }

  protected def classify(event: Event): Classifier = {
    event.channel
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event
  }

}


object MessageBus {

  val actorSystem = ActorSystem("contexts")
  val Bus = new MessageBus

  /**
   * create an actor that stores a browser socket
   */
  def browserSocketContext(s: WebSocketConnection, userId: Long, teamId: Long) = {
    val subscriber = actorSystem.actorOf(Props(new BrowserSocket(s,userId,teamId)))

    Bus.subscribe( subscriber, "/app/socket/%s" format s.toString)
    Bus.subscribe( subscriber, "/app/browser/u/%s" format userId )
    Bus.subscribe( subscriber, "/app/browser/t/%s" format teamId )
    Bus.subscribe( subscriber, "/app/browser" )
  }
}

アクターによるソケット アクセス

実際にソケットを含むアクターは次のとおりです。

/**
 * actor wrapping access for browser socket
 */
class BrowserSocket(
  val s: WebSocketConnection,
  val userId: Long,
  val teamId: Long

) extends Actor {

  def receive = {
    case payload:MessageEvent => 
      s.send(payload.message)

    case ping:MessagePing =>
      s.ping(ping.data)

  }

}
于 2012-09-21T16:54:48.920 に答える
1

EventStreams と Event Bus は、私の知る限り、ログと監視用です。通常は、アクターを使用して必要な機能を構築し、アクター間でメッセージを渡します。

そのため、送信先のAppMessageバッキング アクターを選別するカスタム ルーター アクターに を送信します。おそらく、ルーターは適切と思われる場合はバッキング アクターをスポーンできます。または、アクターは (適切なメッセージを渡すことによって) ルーターでサブスクライブできます。これは主に、実装する必要があるロジックに依存します。

于 2012-09-13T23:00:27.053 に答える