8

既知の(もちろん)数のアクターによって実行されるジョブの数は不明です。アクターがジョブを完了した後、初期ジョブ数が増える可能性があります。つまり、アクターは、タスクの完了時に、実行する新しいジョブを追加する場合があります。

私がこれを処理する方法は、実行の結果だけでなく、アクターがアイドル状態になったことを示す「フラグ」を使用して、各アクターがジョブの完了時にマスターにメッセージを送信するようにすることです。マスターにはジョブのキューとアイドル状態のアクターのキューがあり、アクターが「ジョブ完了メッセージ」を送信するたびに、マスターはそのアクターが実行する他の何かがあるかどうかを確認します。が空で、アイドルのキューがいっぱいです...その時点でシステムをシャットダウンしました。ここにはあまり監督がないので、ちゃんとやっていない気がします…

ルーターにアイドル状態のアクターを照会する方法が見つからなかったため、ルーターを使用していません。質問は次のとおりです。

上記のAkkaで説明した状況を処理するための「適切な」方法は何ですか?

4

3 に答える 3

8

Akkaのルーティング機能 を確認する必要があります。SmallestMailboxRouterあなたが探しているものかもしれません。

別の方法として、オンデマンドでアクターを作成することもできます。つまり、すべてのタスクに対して、新しいアクターが動的に作成されます。中央のアクターは、現在アクティブなすべてのアクターを追跡します。ワーカーアクターが完了すると、それ自体にaPoisonPillを送信し、マスターにシャットダウンについて通知します(アクティブに、またはTerminateAkkaが監視アクターに送信する標準メッセージを介して)。アクティブなアクターがなくなると、つまりタスクがなくなると、コントローラーアクターはシステムをシャットダウンします。

コメントを読んだ後の追加:SmallestMailboxLike、によって混合されたScalaトレイト のソースを見てくださいSmallestMailboxRouter。警告:Scalaの基本的な知識が必要です。しかし、Akkaを使用したい場合は、とにかくこれは一般的に良い考えです...方法は次のisProcessingMessage(ActorRef)ように理解できます。isNotIdle(ActorRef)

// Returns true if the actor is currently processing a message.
// It will always return false for remote actors.
// Method is exposed to subclasses to be able to implement custom
// routers based on mailbox and actor internal state.
protected def isProcessingMessage(a: ActorRef): Boolean = a match {
  case x: LocalActorRef ?
    val cell = x.underlying
    cell.mailbox.isScheduled && cell.currentMessage != null
  case _ ? false
}

// Returns true if the actor currently has any pending messages
// in the mailbox, i.e. the mailbox is not empty.
// It will always return false for remote actors.
// Method is exposed to subclasses to be able to implement custom
// routers based on mailbox and actor internal state.
protected def hasMessages(a: ActorRef): Boolean = a match {
  case x: LocalActorRef ? x.underlying.mailbox.hasMessages
  case _                ? false
}
于 2012-07-07T19:36:06.377 に答える
1

もう1つの戦略は、BalancingDispatcherとRoundRobinRouterを(アクターの「プール」として)使用することです。Akkaドキュメントから:

BalancingDispatcher
# This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors.




# All the actors share a single Mailbox that they get their messages from.

It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.

# Sharability: Actors of the same type only

# Mailboxes: Any, creates one for all Actors

# Use cases: Work-sharing

application.confでディスパッチャを定義するか、起動時にプログラムでロードします。

private final static Config akkaConfig = ConfigFactory.parseString(

            "my-dispatcher.type = BalancingDispatcher \n" +
            "my-dispatcher.executor = fork-join-executor \n" +
            "my-dispatcher.fork-join-executor.parallelism-min = 8 \n" +
            "my-dispatcher.fork-join-executor.parallelism-factor = 3.0 \n" +
            "my-dispatcher.fork-join-executor.parallelism-max = 64 "
);

次に、ルートのルーターとディスパッチャーを定義します。

getContext().actorOf(new Props(MyActor.class).withRouter(new RoundRobinRouter(10)).withDispatcher("my-dispatcher"), "myActor");

したがって、ルーターは単に「配信」メッセージを送信し、ディスパッチャーは選択されたアクターを実行します(そして、ワークスティーリングも実装します)

于 2012-07-27T15:14:00.983 に答える
-1

Balancingディスパッチャーは、BalancingDispatcherで作成されたすべての作成されたアクターに対して1つのメールボックスのみを使用します。だからそれはあなたの仕事を簡単にするでしょう。

于 2017-07-27T05:30:10.167 に答える