1

私はscalaアクターを使用してコードを並列化しようとしています。これは、アクターを使用した最初の実際のコードですが、Java MulithreadingとCでのMPIの経験があります。しかし、私は完全に失われています。

私が実現したいワークフローは循環パイプラインであり、次のように説明できます。

  • ワーカーアクターは別のアクターへの参照を持っているため、円を形成します
  • メッセージを送信して計算をトリガーできるコーディネーターアクターがありますStartWork()
  • ワーカーがStartWork()メッセージを受信すると、ローカルで処理DoWork(...)を行い、サークル内の隣人にメッセージを送信します。
  • 隣人は他のことをして、順番にDoWork(...)自分の隣人にメッセージを送ります。
  • これは、最初のワーカーがDoWork()メッセージを受信するまで続きます。
  • コーディネーターはGetResult()、最初のワーカーにメッセージを送信して、応答を待つことができます。

重要なのは、コーディネーターはデータの準備ができたときにのみ結果を受け取る必要があるということです。ワーカーは、メッセージに応答する前に、ジョブがジョブに戻るのをどのように待つことができますか?GetResult()

計算を高速化するために、すべてのワーカーはいつでもを受け取ることができますStartWork()

これが、ワーカーの疑似実装の最初の試みです。

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
    case GetResult() => reply( ready )
  }
}

コーディネーター側:

worker ! StartWork()
val result = worker !? GetResult() // should wait
4

2 に答える 2

3

GetResultまず、が正しい結果を得ることができるように、単一の作業を構成するものを識別する何らかの識別子が明らかに必要です。Map明らかな解決策は、アクターに結果とMap待機中のゲッターを保持させることだと思います。

class Worker( neighbor: Worker, numWorkers: Int ) {
   var res: Map[Long, Result] = Map.empty
   var gets: Map[Long, OutputChannel[Any]] = Map.empty   
   def act() {
     ...
     case DoWork( id, resultData, remaining ) if remaining == 0 =>
       res += (id -> resultData)
       gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
       gets -= id //clear out getter map now?
     case GetResult(id) if res.isDefinedAt(d) => //result is ready
       reply (res(id))
     case GetResult(id) => //no result ready 
       gets += (id -> sender)
   }
}

注:一致条件で を使用するとif、メッセージ処理が少し明確になります。

于 2010-03-17T17:56:29.960 に答える
1

1つの代替案は次のとおりです。

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
         react {
           case GetResult() => reply( ready )
         }
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
  }
}

作業が終了すると、このワーカーはメッセージを受信するまでスタックしGetResultます。一方、コーディネーターGetResultはワーカーが受信するまでメールボックスに残るため、すぐに送信できます。

于 2010-03-17T22:29:06.767 に答える