2

Akka を使用して自分の Scala アプリケーションに redis を統合していますが、何らかの理由でメッセージを受信しません。コマンドラインで redis-cli を開くと、redis 側に大量のトラフィックがあることを確認できます。

pSubscribe の後、次を受け取ります。subscribed to * and count = 1

私の推測では、Akka がコールバックを受信するように設定されている方法に関連している可能性があります。scala-redis lib の Scala アクターを削除し、いくつかの競合のために Akka アクターに置き換える必要がありました。

コードは次のとおりです。

サブスクライバー アクター

class Subscriber(client: RedisClient) extends Actor {
  var callback: PubSubMessage => Any = { m => }

  def receive: Receive = { 
    case Subscribe(channels) =>
      client.subscribe(channels.head, channels.tail: _*)(callback)

    case pSubscribe(channels) =>
      client.pSubscribe(channels.head, channels.tail: _*)(callback)

    case pSubscribeAll(channels) =>
      Logger.info("Subscribing to all channels")
      client.pSubscribe(channels.head, channels.tail: _*)(callback)

    case Register(cb) =>
      Logger.info("Callback is registered")
      callback = cb

    case Unsubscribe(channels) =>
      client.unsubscribe(channels.head, channels.tail: _*)

    case UnsubscribeAll =>
      client.unsubscribe
  }
}

サブスクライバーの初期化

class RelaySub extends Actor {

  // important config values
  val system = ActorSystem("pubsub")
  val conf = play.api.Play.current.configuration
  val relayPubHost = conf.getString("relays.redis.host").get
  val relayPubPort = conf.getInt("relays.redis.port").get

  val rs = new RedisClient(relayPubHost, relayPubPort)
  val s = system.actorOf(Props(new Subscriber(rs)))
  s ! Register(callback) 
  s ! pSubscribeAll(Array("*"))
  Logger.info("Engine Relay Subscriber has started up")

  def receive: Receive = {      
    case Register(callback) =>
  }

  def callback(pubsub: PubSubMessage) = pubsub match {
    case S(channel, no) => Logger.info("subscribed to " + channel + " and count = " + no)
      case U(channel, no) => Logger.info("unsubscribed from " + channel + " and count = " + no)
      case M(channel, msg) => 
        msg match {
          // exit will unsubscribe from all channels and stop subscription service
          case "exit" => 
            Logger.info("unsubscribe all ... no handler yet ;)")

          // message "+x" will subscribe to channel x
          case x if x startsWith "+" => 
            Logger.info("subscribe to ... no handler yet ;)")

          // message "-x" will unsubscribe from channel x
          case x if x startsWith "-" => 
            Logger.info("unsubscribe from ... no handler yet ;)")

          // other message receive
          case x => 
            Logger.info("Engine: received redis message")
            val channelVars = channel.split(".").toArray[String]
            if(channelVars(0)!=Engine.instanceID)
                channelVars(1) match {
                  case "relay" => 
                    EngineSyncLocal.constructRelay(channel, msg)
                  case _ => 
                    Logger.error("Engine: received unknown redis message")
                }
        }
  }
}

ご協力いただきありがとうございます!

4

2 に答える 2

2

問題を見つけました。これはscala-redisクライアントのバグのようです。

コンシューマークラスにログを追加し、Engine: weird messageエラーを受信し始めました。これは、着信トラフィックを認識しないことを意味します。作者に連絡してプルリクエストを出します。

コード:

class Consumer(fn: PubSubMessage => Any) extends Runnable {

    def start () {
      val myThread = new Thread(this) ;
      myThread.start() ;
    }

    def run {
      whileTrue {
        asList match {
          case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>
            Logger.info("Engine: redis traffic")
            msgType match {
              case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
              case "unsubscribe" if (data.toInt == 0) => 
                fn(U(channel, data.toInt))
                break
              case "punsubscribe" if (data.toInt == 0) => 
                fn(U(channel, data.toInt))
                break
              case "unsubscribe" | "punsubscribe" => 
                fn(U(channel, data.toInt))
              case "message" | "pmessage" => 
                fn(M(channel, data))
              case x => throw new RuntimeException("unhandled message: " + x)
            }
          case _ => Logger.error("Engine: weird redis message")
        }
      }
    }
  }
于 2012-05-28T19:19:13.287 に答える
0
            case x => throw new RuntimeException("unhandled message: " + x)
          }
case Some(Some("pmessage")::Some(pattern)::Some(channel):: Some(message)::Nil)=>
              fn(M(channel, message))

asList の一致にケースがありません

于 2012-10-17T16:31:32.440 に答える