ActorEventBus
これは、この Gist に基づく私のソリューションでした: https://gist.github.com/3757237
これは、EventStream を扱うよりもはるかに保守しやすいことがわかりました。将来的には複数の EventStream が必要になるかもしれませんが、現時点では、現在のインフラストラクチャを容易にサポートできます。
メッセージバス
まず、PubSub チャネルに基づいて、アクターでラップされたソケットへの送信メッセージを処理する MessageBus :
case class MessageEvent(val channel:String, val message:String)
/**
* message bus to route messages to their appropriate contexts
*/
class MessageBus extends ActorEventBus with LookupClassification {
type Event = MessageEvent
type Classifier = String
protected def mapSize(): Int = {
10
}
protected def classify(event: Event): Classifier = {
event.channel
}
protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event
}
}
object MessageBus {
val actorSystem = ActorSystem("contexts")
val Bus = new MessageBus
/**
* create an actor that stores a browser socket
*/
def browserSocketContext(s: WebSocketConnection, userId: Long, teamId: Long) = {
val subscriber = actorSystem.actorOf(Props(new BrowserSocket(s,userId,teamId)))
Bus.subscribe( subscriber, "/app/socket/%s" format s.toString)
Bus.subscribe( subscriber, "/app/browser/u/%s" format userId )
Bus.subscribe( subscriber, "/app/browser/t/%s" format teamId )
Bus.subscribe( subscriber, "/app/browser" )
}
}
アクターによるソケット アクセス
実際にソケットを含むアクターは次のとおりです。
/**
* actor wrapping access for browser socket
*/
class BrowserSocket(
val s: WebSocketConnection,
val userId: Long,
val teamId: Long
) extends Actor {
def receive = {
case payload:MessageEvent =>
s.send(payload.message)
case ping:MessagePing =>
s.ping(ping.data)
}
}