私はscalaアクターを使用してコードを並列化しようとしています。これは、アクターを使用した最初の実際のコードですが、Java MulithreadingとCでのMPIの経験があります。しかし、私は完全に失われています。
私が実現したいワークフローは循環パイプラインであり、次のように説明できます。
- 各ワーカーアクターは別のアクターへの参照を持っているため、円を形成します
- メッセージを送信して計算をトリガーできるコーディネーターアクターがあります
StartWork() - ワーカーが
StartWork()メッセージを受信すると、ローカルで処理DoWork(...)を行い、サークル内の隣人にメッセージを送信します。 - 隣人は他のことをして、順番に
DoWork(...)自分の隣人にメッセージを送ります。 - これは、最初のワーカーが
DoWork()メッセージを受信するまで続きます。 - コーディネーターは
GetResult()、最初のワーカーにメッセージを送信して、応答を待つことができます。
重要なのは、コーディネーターはデータの準備ができたときにのみ結果を受け取る必要があるということです。ワーカーは、メッセージに応答する前に、ジョブがジョブに戻るのをどのように待つことができますか?GetResult()
計算を高速化するために、すべてのワーカーはいつでもを受け取ることができますStartWork()。
これが、ワーカーの疑似実装の最初の試みです。
class Worker( neighbor: Worker, numWorkers: Int ) {
var ready = Foo()
def act() {
case StartWork() => {
val someData = doStuff()
neighbor ! DoWork( someData, numWorkers-1 )
}
case DoWork( resultData, remaining ) => if( remaining == 0 ) {
ready = resultData
} else {
val someOtherData = doOtherStuff( resultData )
neighbor ! DoWork( someOtherData, remaining-1 )
}
case GetResult() => reply( ready )
}
}
コーディネーター側:
worker ! StartWork()
val result = worker !? GetResult() // should wait