7

非常に長時間実行され、計算コストのかかる作業を行う必要のあるアクターがいますが、計算自体は段階的に実行できます。ですから、完全な計算自体は完了するのに何時間もかかりますが、中間結果は実際には非常に有用であり、私はそれらの要求に応答できるようにしたいと思います。これは私がやりたいことの擬似コードです:

var intermediateResult = ...
loop {
     while (mailbox.isEmpty && computationNotFinished)
       intermediateResult = computationStep(intermediateResult)


     receive {
         case GetCurrentResult => sender ! intermediateResult
         ...other messages...
     }
 }
4

4 に答える 4

8

これを行うための最良の方法は、あなたがすでに行っていることに非常に近いです:

case class Continue(todo: ToDo)
class Worker extends Actor {
  var state: IntermediateState = _
  def receive = {
    case Work(x) =>
      val (next, todo) = calc(state, x)
      state = next
      self ! Continue(todo)
    case Continue(todo) if todo.isEmpty => // done
    case Continue(todo) =>
      val (next, rest) = calc(state, todo)
      state = next
      self ! Continue(rest)
  }
  def calc(state: IntermediateState, todo: ToDo): (IntermediateState, ToDo)
}

編集:より多くの背景

アクターが自分自身にメッセージを送信すると、Akkaの内部処理は基本的にwhileループ内でメッセージを実行します。一度に処理されるメッセージの数は、アクターのディスパッチャーのthroughput設定(デフォルトは5)によって決定されます。この処理量の後、スレッドはプールに戻され、継続は新しいタスクとしてディスパッチャーにキューに入れられます。したがって、上記のソリューションには2つの調整可能要素があります。

  • 1つのメッセージに対して複数のステップを処理します(処理ステップが非常に小さい場合)
  • throughputスループットを向上させ、公平性を低下させるために設定を増やします

元の問題では、おそらく数百のCPUを備えていない一般的なハードウェア上で、数百のそのようなアクターが実行されているようです。したがって、スループット設定は、各バッチが約1時間以内にかかるように設定する必要があります。10ms。

パフォーマンス評価

フィボナッチで少し遊んでみましょう:

Welcome to Scala version 2.10.0-RC1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_07).
Type in expressions to have them evaluated.
Type :help for more information.

scala> def fib(x1: BigInt, x2: BigInt, steps: Int): (BigInt, BigInt) = if(steps>0) fib(x2, x1+x2, steps-1) else (x1, x2)
fib: (x1: BigInt, x2: BigInt, steps: Int)(BigInt, BigInt)

scala> def time(code: =>Unit) { val start = System.currentTimeMillis; code; println("code took " + (System.currentTimeMillis - start) + "ms") }
time: (code: => Unit)Unit

scala> time(fib(1, 1, 1000))
code took 1ms

scala> time(fib(1, 1, 1000))
code took 1ms

scala> time(fib(1, 1, 10000))
code took 5ms

scala> time(fib(1, 1, 100000))
code took 455ms

scala> time(fib(1, 1, 1000000))
code took 17172ms

これは、おそらく非常に最適化されたループでは、fib_100000に0.5秒かかることを意味します。それでは、俳優と少し遊んでみましょう。

scala> case class Cont(steps: Int, batch: Int)
defined class Cont

scala> val me = inbox()
me: akka.actor.ActorDSL.Inbox = akka.actor.dsl.Inbox$Inbox@32c0fe13

scala> val a = actor(new Act {
  var s: (BigInt, BigInt) = _
  become {
    case Cont(x, y) if y < 0 => s = (1, 1); self ! Cont(x, -y)
    case Cont(x, y) if x > 0 => s = fib(s._1, s._2, y); self ! Cont(x - 1, y)
    case _: Cont => me.receiver ! s
   }
})
a: akka.actor.ActorRef = Actor[akka://repl/user/$c]

scala> time{a ! Cont(1000, -1); me.receive(10 seconds)}
code took 4ms

scala> time{a ! Cont(10000, -1); me.receive(10 seconds)}
code took 27ms

scala> time{a ! Cont(100000, -1); me.receive(10 seconds)}
code took 632ms

scala> time{a ! Cont(1000000, -1); me.receive(30 seconds)}
code took 17936ms

これはすでに興味深いことです。ステップごとに十分な時間が与えられると(最後の行の舞台裏に巨大なBigIntがあります)、俳優はそれほど余分なことはしません。次に、よりバッチ処理された方法で小さな計算を実行するとどうなるかを見てみましょう。

scala> time{a ! Cont(10000, -10); me.receive(30 seconds)}
code took 462ms

これは、上記の直接バリアントの結果にかなり近いものです。

結論

自分自身にメッセージを送信することは、ほとんどすべてのアプリケーションにとって高価ではありません。実際の処理ステップを数百ナノ秒よりわずかに大きくしてください。

于 2012-10-13T04:10:55.193 に答える
4

Roland Kuhnの回答に対するあなたのコメントから、少なくともブロックでは再帰的と見なすことができるいくつかの作業があると思います。そうでない場合、問題を処理するためのクリーンな解決策はないと思います。複雑なパターンマッチングブロックを処理する必要があります。

私の仮定が正しければ、計算を非同期的にスケジュールし、アクターが他のメッセージに自由に答えられるようにします。重要なポイントは、Future monadic機能を使用し、単純な受信ブロックを使用することです。3つのメッセージ(startComputation、changeState、getState)を処理する必要があります

最終的に次の受信が発生します。

def receive {
  case StartComputation(myData) =>expensiveStuff(myData)
  case ChangeState(newstate) = this.state = newstate
  case GetState => sender ! this.state
}

次に、独自の再帰マップを定義することにより、Futureでmapメソッドを活用できます。

 def mapRecursive[A](f:Future[A], handler: A => A, exitConditions: A => Boolean):Future[A] = {
    f.flatMap {  a=>
                 if (exitConditions(a))
                   f
                 else {
                     val newFuture = f.flatMap{ a=> Future(handler(a))}
                     mapRecursive(newFuture,handler,exitConditions)
                 }

              }
  }

このツールを入手すると、すべてが簡単になります。次の例を見ると:

def main(args:Array[String]){
    val baseFuture:Future[Int] = Promise.successful(64)
    val newFuture:Future[Int] = mapRecursive(baseFuture,
                                 (a:Int) => {
                                   val result = a/2
                                   println("Additional step done: the current a is " + result)
                                   result
                                 }, (a:Int) => (a<=1))

    val one = Await.result(newFuture,Duration.Inf)
    println("Computation finished, result = " + one)



  }

その出力は次のとおりです。

追加の手順が実行されました:現在のaは32です

追加の手順が実行されました:現在のaは16です

追加の手順が実行されました:現在のaは8です

追加の手順が実行されました:現在のaは4です

追加の手順が実行されました:現在のaは2です

追加の手順が実行されました:現在のaは1です

計算が終了しました、結果= 1

expensiveStuffメソッド内で同じことができることを理解しています

  def expensiveStuff(myData:MyData):Future[MyData]= {
    val firstResult = Promise.successful(myData)
    val handler : MyData => MyData = (myData) => {
      val result = myData.copy(myData.value/2)
      self ! ChangeState(result)
      result
    }
    val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1
    mapRecursive(firstResult,handler,exitCondition)
  }

編集-より詳細

メールボックスからのメッセージをスレッドセーフで同期的に処理するアクターをブロックしたくない場合は、別のスレッドで計算を実行するしかありません。これはまさに高性能のノンブロッキング受信です。

しかし、あなたは私が提案するアプローチは高いパフォーマンスのペナルティを支払うと言ったのは正しかったです。すべてのステップは異なる未来で行われますが、それはまったく必要ないかもしれません。したがって、ハンドラーを再帰的に実行して、シングルスレッドまたはマルチスレッドの実行を取得できます。結局のところ、魔法の公式はありません。

  • 非同期でスケジュールを設定し、コストを最小限に抑えたい場合は、すべての作業を単一のスレッドで実行する必要があります
  • ただし、これにより、他の作業を開始できなくなる可能性があります。これは、スレッドプール上のすべてのスレッドが取得されると、futureがキューに入れられるためです。したがって、操作を複数の先物に分割して、完全に使用している場合でも、古い作業が完了する前に新しい作業をスケジュールできるようにすることができます。

def recurseFuture[A](entryFuture: Future[A], handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long = Long.MaxValue): Future[A] = {
        def recurse(a:A, handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long, currentStep: Long): Future[A] = {
          if (exitCondition(a))
            Promise.successful(a)
          else
            if (currentStep==maxNestedRecursion)
              Promise.successful(handler(a)).flatMap(a => recurse(a,handler,exitCondition,maxNestedRecursion,0))
            else{
              recurse(handler(a),handler,exitCondition,maxNestedRecursion,currentStep+1)
            }
        }
        entryFuture.flatMap { a => recurse(a,handler,exitCondition,maxNestedRecursion,0) }
      }

テスト目的でハンドラーメソッドを拡張しました。

  val handler: Int => Int = (a: Int) => {
      val result = a / 2
      println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName)
      result
    }

アプローチ1:ハンドラー自体を再帰的に実行して、すべてを単一のスレッドで実行できるようにします。

    println("Starting strategy with all the steps on the same thread")
    val deepestRecursion: Future[Int] = recurseFuture(baseFuture,handler, exitCondition)
    Await.result(deepestRecursion, Duration.Inf)
    println("Completed strategy with all the steps on the same thread")
    println("")

アプローチ2:ハンドラー自体の限られた深さで再帰します

println("Starting strategy with the steps grouped by three")
val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,3)
val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,4)
Await.result(threeStepsInSameFuture, Duration.Inf)
Await.result(threeStepsInSameFuture2, Duration.Inf)
println("Completed strategy with all the steps grouped by three")
executorService.shutdown()
于 2012-10-15T12:23:24.650 に答える
2

アクターコードを実行することになっているスレッドをブロックするため、長時間実行される計算を行うためにアクターを使用しないでください。

計算に別のThread/ThreadPoolを使用し、AtomicReferencesを使用して中間結果を次の擬似コードの行に格納/クエリする設計を試してみます。

val cancelled = new AtomicBoolean(false)
val intermediateResult = new AtomicReference[IntermediateResult]()

object WorkerThread extends Thread {
  override def run {
    while(!cancelled.get) {
      intermediateResult.set(computationStep(intermediateResult.get))
    }
  }
}

loop {
  react {
    case StartComputation => WorkerThread.start()
    case CancelComputation => cancelled.set(true)
    case GetCurrentResult => sender ! intermediateResult.get
  }
}
于 2012-10-12T10:32:01.820 に答える
1

これは典型的な並行性の問題です。いくつかのルーチン/アクター(またはそれらを呼び出したいもの)が必要です。コードはほとんど正しいGoであり、コンテキストにわいせつに長い変数名が付いています。最初のルーチンは、クエリと中間結果を処理します。

func serveIntermediateResults(
        computationChannel chan *IntermediateResult,
        queryChannel chan chan<-*IntermediateResult) {
    var latestIntermediateResult *IntermediateResult // initial result
    for {
        select {
        // an update arrives
        case latestIntermediateResult, notClosed := <-computationChannel:
            if !notClosed {
                // the computation has finished, stop checking
                computationChannel = nil
            }
        // a query arrived
        case queryResponseChannel, notClosed := <-queryChannel:
            if !notClosed {
                // no more queries, so we're done
                return
            }
            // respond with the latest result
            queryResponseChannel<-latestIntermediateResult
        }
    }
}

長い計算では、必要に応じて中間結果を更新します。

func longComputation(intermediateResultChannel chan *IntermediateResult) {
    for notFinished {
        // lots of stuff
        intermediateResultChannel<-currentResult
    }
    close(intermediateResultChannel)
}

最後に、現在の結果を尋ねるために、これをうまくするためのラッパーがあります。

func getCurrentResult() *IntermediateResult {
     responseChannel := make(chan *IntermediateResult)
     // queryChannel was given to the intermediate result server routine earlier
     queryChannel<-responseChannel
     return <-responseChannel
}
于 2012-10-12T05:32:18.093 に答える