Akka を使用して自分の Scala アプリケーションに redis を統合していますが、何らかの理由でメッセージを受信しません。コマンドラインで redis-cli を開くと、redis 側に大量のトラフィックがあることを確認できます。
pSubscribe の後、次を受け取ります。subscribed to * and count = 1
私の推測では、Akka がコールバックを受信するように設定されている方法に関連している可能性があります。scala-redis lib の Scala アクターを削除し、いくつかの競合のために Akka アクターに置き換える必要がありました。
コードは次のとおりです。
サブスクライバー アクター
class Subscriber(client: RedisClient) extends Actor {
var callback: PubSubMessage => Any = { m => }
def receive: Receive = {
case Subscribe(channels) =>
client.subscribe(channels.head, channels.tail: _*)(callback)
case pSubscribe(channels) =>
client.pSubscribe(channels.head, channels.tail: _*)(callback)
case pSubscribeAll(channels) =>
Logger.info("Subscribing to all channels")
client.pSubscribe(channels.head, channels.tail: _*)(callback)
case Register(cb) =>
Logger.info("Callback is registered")
callback = cb
case Unsubscribe(channels) =>
client.unsubscribe(channels.head, channels.tail: _*)
case UnsubscribeAll =>
client.unsubscribe
}
}
サブスクライバーの初期化
class RelaySub extends Actor {
// important config values
val system = ActorSystem("pubsub")
val conf = play.api.Play.current.configuration
val relayPubHost = conf.getString("relays.redis.host").get
val relayPubPort = conf.getInt("relays.redis.port").get
val rs = new RedisClient(relayPubHost, relayPubPort)
val s = system.actorOf(Props(new Subscriber(rs)))
s ! Register(callback)
s ! pSubscribeAll(Array("*"))
Logger.info("Engine Relay Subscriber has started up")
def receive: Receive = {
case Register(callback) =>
}
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => Logger.info("subscribed to " + channel + " and count = " + no)
case U(channel, no) => Logger.info("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
Logger.info("unsubscribe all ... no handler yet ;)")
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
Logger.info("subscribe to ... no handler yet ;)")
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
Logger.info("unsubscribe from ... no handler yet ;)")
// other message receive
case x =>
Logger.info("Engine: received redis message")
val channelVars = channel.split(".").toArray[String]
if(channelVars(0)!=Engine.instanceID)
channelVars(1) match {
case "relay" =>
EngineSyncLocal.constructRelay(channel, msg)
case _ =>
Logger.error("Engine: received unknown redis message")
}
}
}
}
ご協力いただきありがとうございます!