15

子アクターを作成して長い計算を実行するアクターがあります。

問題は、子アクターの初期化に数秒かかり、作成されてから完全に初期化されるまでの間に親アクターが子に送信するすべてのメッセージが破棄されることです。

これは私が使用しているコードのロジックです:

class ChildActor extends Actor {
  val tagger = IntializeTagger(...) // this takes a few seconds to complete

  def receive = {
    case Tag(text) => sender ! tagger.tag(text)
    case "hello" => println("Hello")
    case _ => println("Unknown message")
  }
}

class ParentActor extends Actor {
  val child = context.ActorOf(Props[ChildActor], name = "childactor")

  // the below two messages seem to get lost
  child ! "hello"
  child ! Tag("This is my sample text")

  def receive = {
     ...
  }
}

どうすればその問題を回避できますか? 子が完全に初期化されるまで親アクターを待機させることは可能ですか? 子アクターをルーティングで使用し、場合によってはリモート アクター システムで使用します。

編集

drexin のアドバイスに従って、コードを次のように変更しました。

class ChildActor extends Actor {
  var tagger: Tagger = _

  override def preStart() = {
    tagger = IntializeTagger(...) // this takes a few seconds to complete
  }

  def receive = {
    case Tag(text) => sender ! tagger.tag(text)
    case "hello" => println("Hello")
    case _ => println("Unknown message")
  }
}

class ParentActor extends Actor {
  var child: ActorRef = _

  override def preStart() = {
    child = context.ActorOf(Props[ChildActor], name = "childactor")

    // When I add
    // Thread.sleep(5000)
    // here messages are processed without problems

    // wihout hardcoding the 5 seconds waiting 
    // the below two messages seem to get lost
    child ! "hello"
    child ! Tag("This is my sample text")
  }

  def receive = {
     ...
  }
}

しかし、問題は残ります。私は何が欠けていますか?

4

3 に答える 3

9

Stashあなたが探しているのはとの組み合わせだと思いますbecome。子アクターは初期状態を未初期化に設定し、この状態の間、完全に初期化されるまですべての着信メッセージを隠しておくという考えになります。完全に初期化されたら、動作を初期化状態に切り替える前に、すべてのメッセージを unstash できます。簡単な例は次のとおりです。

class ChildActor2 extends Actor with Stash{
  import context._
  var dep:SlowDependency = _

  override def preStart = {
    val me = context.self
    Future{
      dep = new SlowDependency
      me ! "done"
    }
  }

  def uninitialized:Receive = {
    case "done" => 
      unstashAll
      become(initialized) 
    case other => stash()
  }

  def initialized:Receive = {
    case "a" => println("received the 'a' message")
    case "b" => println("received the 'b' message")   
  }

  def receive = uninitialized
}

preStartアクターの起動を停止しないように、非同期で初期化を行っていることに注意してください。これは、変更可能な変数を閉じるため、少し醜いdepです。代わりに、遅い依存関係のインスタンス化を処理し、それをこのアクターに送り返す別のアクターにメッセージを送信することで、確実に処理できます。become依存関係を受け取ると、状態を呼び出しますinitialized

ここで注意点が 1 つありStashます。Akka のドキュメントから直接貼り付けます。

Please note that the Stash can only be used together with actors that 
have a deque-based mailbox. For this, configure the mailbox-type of the 
dispatcher to be a deque-based mailbox, such as 
akka.dispatch.UnboundedDequeBasedMailbox (see Dispatchers (Scala)). 

これがうまくいかない場合は、より多くのDIタイプのアプローチを試して、コンストラクターを介して遅い依存関係を子アクターに注入することができます。したがって、子アクターを次のように定義します。

class ChildActor(dep:SlowDependency) extends Actor{
    ...
} 

次に、このアクターを起動するときは、次のようにします。

context.actorOf(new Props().withCreator(new ChildActor(slowDep)), name = "child-actor")
于 2013-06-12T12:05:29.187 に答える
0

子アクターから親に「準備完了」メッセージを送信し、このメッセージが受信された後にのみ子アクターへのメッセージの送信を開始することをお勧めします。単純なユースケースのメソッドでのみ行うことも、子が初期化された後に親アクターの動作をreceive()使用becomeまたはFSM変更することもできます (たとえば、子へのメッセージを中間ストレージに保存し、準備ができたらすべて送信します)。 )。

于 2013-06-12T11:31:19.813 に答える