0

私は Akka Cluster (バージョン 2.4.10) を使用しており、「フロントエンド」の役割に指定されたいくつかのノードと、「ワーカー」として指定されたいくつかのノードがあります。ワーカーはリモート マシン上にあります。着信作業は、フロントエンド アクターによってラウンド ロビン ルーティングによってワーカーに分散されます。問題は、「ワーカー」からの応答をフロントエンド アクターに送り返すことです。作業員が作業を完了しているのがわかります。しかし、ワーカーからフロントエンドに送信されたメッセージは届かず、配信不能になります。ログに以下のエラーが表示されます。

[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered.

私はこれを見て、コードで同じことに従っています。これも見ましたが、事前にルートがわからないため、提案された解決策はこの場合には適用されません。それは構成を通じてもたらされ、変更される可能性があります。ラウンドロビン ルーターの構成は次のとおりです。

akka.actor.deployment {
  /frontEnd/hm = {
    router = round-robin-group
    nr-of-instances = 5
    routees.paths = ["/user/hmWorker"]
    cluster {
      enabled = on
      use-role = backend
      allow-local-routees = on
    }
  }
}

ルーターは、以下のようにフロントエンド アクターでインスタンス化されます。

val router = context.actorOf(FromConfig.props(), name = "hm")
val controller = context.actorOf(Props(classOf[Controller], router))

コントローラーとワーカー コードは次のとおりです。

// Node 1 : Controller routes requests using round-robin
class Controller(router: ActorRef) extends Actor {

    val list = List("a", "b") // Assume this is a big list

    val groups = list.grouped(500)

    override def receive: Actor.Receive = {
      val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]]))
      val future = Future.sequence(futures).map(_.flatten)
      val result = Await.result(future, 50 seconds)
      println(s"Result is $result")
    }
}

// Node 2
class Worker extends Actor {

    override def receive: Actor.Receive = {
      case Message(lst) =>
            val future: Future[List[String]] = // Do Something asynchronous
            future onComplete {
                case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor.
                case Failure(th) => // Error handling
            }
    }
}

ここで私が間違っていることを教えてください。あなたの助けに感謝。

4

1 に答える 1

2

sender()のコールバックでは使用しないでくださいFuture。コールバックが処理されるまでに、sender()はメッセージを受信したときとは異なる何かを参照している可能性があります。

次のように、最初にコールバックの外で参照を保存することを検討してください。

override def receive: Actor.Receive = {
  case Message(lst) =>
        val future: Future[List[String]] = // Do Something asynchronous
        val replyTo: ActorRef = sender()
        future onComplete {
            case Success(r) => replyTo.!(r)(context.parent) // This message is not delivered to Controller actor.
            case Failure(th) => // Error handling
        }
}

または、さらに良いことに、パイプ パターンを使用します。

import akka.pattern.pipe
override def receive: Actor.Receive = {
  case Message(lst) =>
    val future: Future[List[String]] = // Do Something asynchronous
    future.pipeTo(sender())
}
于 2016-10-07T15:35:29.817 に答える