Akka アクターをメールボックスとしてのみ使用したい、つまり、それぞれが 1 つのリモート アクターを作成する n 個のスレッドを作成したい。
各スレッドは、他のスレッドのすべてのリモート アクターへの参照を取得し、それぞれのアクターを介して相互にメッセージを送信できるようにします。
アクターは次のように定義されます。
case class Receive
case class GroupReceive(id: Symbol)
case class GroupMsg[T](id: Symbol, msg: T)
class FooParActor(val distributor: Distributor) extends Actor
with Stash {
import context._
val globalRank: Int = distributor.globalRank
def doReceive(realSender: ActorRef, ID: Symbol) {
unstashAll()
become({
case GroupMsg(ID, msg) =>
realSender ! msg
unbecome()
case GroupMsg(otherId, msg) =>
println(globalRank + ": stashing " + otherId)
unbecome()
case x => sys.error("bad msg: " + x)
}, discardOld = false)
}
def receive = {
case GroupReceive(id) =>
doReceive(sender, id)
case GroupMsg(id, x) =>
stash()
case x => sys.error("bad msg: " + x)
}
}
メッセージを読むために、所有者スレッドはローカルのアクターに送信GroupReceive('someSymbol)
し、次にそのローカル アクターが GroupMsg をスレッドに転送します。メッセージを読み取るためのスレッドの観点からのコードは、次のようになります。
def groupRcv[T](id:Symbol) = Await.result(aref ? GroupReceive(id), timeout.duration).asInstanceOf[T]
はaref
、このスレッドのローカル アクターへの参照です。
非常に単純な使用法と小さなメッセージでも、上記のパターンでデッドロック (5 秒のタイムアウト) が発生することがあります。GroupReceive(id)
メッセージを受信した後、 doReceive(...): の最初のケースに入る前にアクターが失速するという問題に絞り込みましたcase GroupMsg(ID, msg) =>
。
doReceive 呼び出しに行く前に、アクターが実際に隠し場所にメッセージを持っていることを確認するために printout-traces を作成しましたが、何らかの理由でそれらを処理していないようです。上に示したコードは、 がの隠し場所GroupMsg()
から失われる状態になる可能性がありますか? または、アクターがメッセージFooParActor
を受信した後にデッドロックに陥る他の方法はありますか?GroupReceive()