1

実際、アクター (ルーター) システムを正しく動作させるのに問題があります。私のセットアップ:

Play コントローラー内で akka ルーターを使用しようとしています。依存性注入には scaldi を使用します。

スカルディモジュール:

class UserDAOModule extends Module {
  binding to new ExampleRouter
  binding toProvider new UserDAOWorker
}

akka ルーター:

class UserDAORouter(implicit inj:Injector) extends Actor with AkkaInjectable {

  val userDAOProps = injectActorProps[UserDAOWorker]

  var router = {
    val routees = Vector.fill(5) {
      val r = context.actorOf(userDAOProps)
      context watch r
      ActorRefRoutee(r)
    }
    Router(RoundRobinRoutingLogic(), routees)
  }

  override def receive: Receive = {
    case mm: MongoDBMessage =>
      router.route(mm, sender)
    case Terminated(a) =>
      router = router.removeRoutee(a)
      val r = context.actorOf(userDAOProps)
      context watch r
      router = router.addRoutee(r)
  }

}

ワーカー:

class UserDAOWorker(implicit inj:Injector) extends Actor with Injectable {

  val db = inject[DefaultDB]
  val collection:JSONCollection = db("users")
  val currentSender = sender

  override def receive: Receive = {
    case InsertUser(user) => insertUser(user)
  }

  def insertUser(user:User) = {
    collection.save(user).onComplete {
      case Failure(e) => currentSender ! new UserDAOReturnMessage(Some(e), None)
      case Success(lastError) => currentSender ! new UserDAOReturnMessage(None, lastError)
    }
  }
}

メッセージ (insertUser メッセージ) をルーターに送信すると、メッセージは正しくルーティングされ、ワー​​カーはメッセージを受信しますが、ワーカーがメッセージを送信者に送り返すと、配信できないため、デッド レター オフィスに送信されます。これを修正する方法がわかりません。私を助けることができる人はいますか?

前もって感謝します

4

1 に答える 1

1

問題は、アクターの作成時にコンストラクターで currentSender が null (つまり ActorRef.noSender) で初期化されることだと思います。「送信者」は、 receive() でメッセージを受信するコンテキストでのみ有効です。ActorRef.noSender にメッセージを送信することは、配信不能キューに送信することと同じです。

このようなものが動作するはずです:

class UserDAOWorker(implicit inj:Injector) extends Actor with Injectable {

  val db = inject[DefaultDB]
  val collection:JSONCollection = db("users")

  override def receive: Receive = {
    case InsertUser(user) => {
      insertUser(sender, user)
    }
  }

  def insertUser(currentSender : ActorRef, user:User) = {
    collection.save(user).onComplete {
      case Failure(e) => currentSender ! new UserDAOReturnMessage(Some(e), None)
      case Success(lastError) => currentSender ! new UserDAOReturnMessage(None, lastError)
    }
  }
}
于 2014-09-26T22:47:13.400 に答える