2

Typesafe Redis Play pluginを使用して Publish-Subscribe のために Redis に接続しようとしています。

毎秒メッセージを生成するアクターで構成される次のテスト シナリオがあります。

  // Initialization happens in Application.scala,
  private lazy val fakeStreamActor = Akka.system.actorOf(Props[FakeStreamActor])

  val actorPut = Akka.system.scheduler.schedule(
    Duration(1000, MILLISECONDS),
    Duration(1000, MILLISECONDS),
    fakeStreamActor,
    Put("This is a sample message"))

アクター ソース:

class FakeStreamActor extends Actor {
  implicit val timeout = Timeout(1, SECONDS)

  val CHANNEL = "channel1"
  val plugin = Play.application.plugin(classOf[RedisPlugin]).get
  val listener = new MyListener()

  val pool = plugin.sedisPool

  pool.withJedisClient{ client =>
    client.subscribe(listener, CHANNEL)
  }

  def receive = {

    case Put(msg: String) => {
      //send data to Redis
      Logger.info("Push %s".format(msg))
      pool.withJedisClient { client =>
        client.publish(CHANNEL, msg)
      }

    }
  }
}

/** Messages */
case class Put(msg: String)

サブスクライブ リスナー:

case class MyListener() extends JedisPubSub {
  def onMessage(channel: String, message: String): Unit = {
    Logger.info("onMessage[%s, %s]".format(channel, message))
  }

  def onSubscribe(channel: String, subscribedChannels: Int): Unit = {
    Logger.info("onSubscribe[%s, %d]".format(channel, subscribedChannels))
  }

  def onUnsubscribe(channel: String, subscribedChannels: Int): Unit = {
    Logger.info("onUnsubscribe[%s, %d]".format(channel, subscribedChannels))
  }

  def onPSubscribe(pattern: String, subscribedChannels: Int): Unit = {
    Logger.info("onPSubscribe[%s, %d]".format(pattern, subscribedChannels))
  }

  def onPUnsubscribe(pattern: String, subscribedChannels: Int): Unit = {
    Logger.info("onPUnsubscribe[%s, %d]".format(pattern, subscribedChannels))
  }

  def onPMessage(pattern: String, channel: String, message: String): Unit = {
    Logger.info("onPMessage[%s, %s, %s]".format(pattern, channel, message))
  }
}

ここで、理想的には、定義されたチャネルにサブスクライブし、ログでリスナーが受信したメッセージを毎秒どのように処理しているかを確認する必要があります。しかし、サブスクライブの行為がスレッドをロックするため、それは起こりません。

私の質問は:

Play の非同期性を利用してノンブロッキング サブスクリプションを利用する方法はありますか?

4

1 に答える 1

4

うん。これは、Global.scala で行う方法です。

Akka.future { 
  val j = new RedisPlugin(app).jedisPool.getResource
  j.subscribe(PubSub, "*")
}

プラグインのインスタンス化に問題がありましたが、基本的に withJedisClient ビットを future ブロック内に配置していました。

scala でプラグインをインスタンス化する方法を教えてくれてありがとう!

于 2013-02-20T19:02:24.573 に答える