Roland Kuhnの回答に対するあなたのコメントから、少なくともブロックでは再帰的と見なすことができるいくつかの作業があると思います。そうでない場合、問題を処理するためのクリーンな解決策はないと思います。複雑なパターンマッチングブロックを処理する必要があります。
私の仮定が正しければ、計算を非同期的にスケジュールし、アクターが他のメッセージに自由に答えられるようにします。重要なポイントは、Future monadic機能を使用し、単純な受信ブロックを使用することです。3つのメッセージ(startComputation、changeState、getState)を処理する必要があります
最終的に次の受信が発生します。
def receive {
case StartComputation(myData) =>expensiveStuff(myData)
case ChangeState(newstate) = this.state = newstate
case GetState => sender ! this.state
}
次に、独自の再帰マップを定義することにより、Futureでmapメソッドを活用できます。
def mapRecursive[A](f:Future[A], handler: A => A, exitConditions: A => Boolean):Future[A] = {
f.flatMap { a=>
if (exitConditions(a))
f
else {
val newFuture = f.flatMap{ a=> Future(handler(a))}
mapRecursive(newFuture,handler,exitConditions)
}
}
}
このツールを入手すると、すべてが簡単になります。次の例を見ると:
def main(args:Array[String]){
val baseFuture:Future[Int] = Promise.successful(64)
val newFuture:Future[Int] = mapRecursive(baseFuture,
(a:Int) => {
val result = a/2
println("Additional step done: the current a is " + result)
result
}, (a:Int) => (a<=1))
val one = Await.result(newFuture,Duration.Inf)
println("Computation finished, result = " + one)
}
その出力は次のとおりです。
追加の手順が実行されました:現在のaは32です
追加の手順が実行されました:現在のaは16です
追加の手順が実行されました:現在のaは8です
追加の手順が実行されました:現在のaは4です
追加の手順が実行されました:現在のaは2です
追加の手順が実行されました:現在のaは1です
計算が終了しました、結果= 1
expensiveStuff
メソッド内で同じことができることを理解しています
def expensiveStuff(myData:MyData):Future[MyData]= {
val firstResult = Promise.successful(myData)
val handler : MyData => MyData = (myData) => {
val result = myData.copy(myData.value/2)
self ! ChangeState(result)
result
}
val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1
mapRecursive(firstResult,handler,exitCondition)
}
編集-より詳細
メールボックスからのメッセージをスレッドセーフで同期的に処理するアクターをブロックしたくない場合は、別のスレッドで計算を実行するしかありません。これはまさに高性能のノンブロッキング受信です。
しかし、あなたは私が提案するアプローチは高いパフォーマンスのペナルティを支払うと言ったのは正しかったです。すべてのステップは異なる未来で行われますが、それはまったく必要ないかもしれません。したがって、ハンドラーを再帰的に実行して、シングルスレッドまたはマルチスレッドの実行を取得できます。結局のところ、魔法の公式はありません。
- 非同期でスケジュールを設定し、コストを最小限に抑えたい場合は、すべての作業を単一のスレッドで実行する必要があります
- ただし、これにより、他の作業を開始できなくなる可能性があります。これは、スレッドプール上のすべてのスレッドが取得されると、futureがキューに入れられるためです。したがって、操作を複数の先物に分割して、完全に使用している場合でも、古い作業が完了する前に新しい作業をスケジュールできるようにすることができます。
def recurseFuture[A](entryFuture: Future[A], handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long = Long.MaxValue): Future[A] = {
def recurse(a:A, handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long, currentStep: Long): Future[A] = {
if (exitCondition(a))
Promise.successful(a)
else
if (currentStep==maxNestedRecursion)
Promise.successful(handler(a)).flatMap(a => recurse(a,handler,exitCondition,maxNestedRecursion,0))
else{
recurse(handler(a),handler,exitCondition,maxNestedRecursion,currentStep+1)
}
}
entryFuture.flatMap { a => recurse(a,handler,exitCondition,maxNestedRecursion,0) }
}
テスト目的でハンドラーメソッドを拡張しました。
val handler: Int => Int = (a: Int) => {
val result = a / 2
println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName)
result
}
アプローチ1:ハンドラー自体を再帰的に実行して、すべてを単一のスレッドで実行できるようにします。
println("Starting strategy with all the steps on the same thread")
val deepestRecursion: Future[Int] = recurseFuture(baseFuture,handler, exitCondition)
Await.result(deepestRecursion, Duration.Inf)
println("Completed strategy with all the steps on the same thread")
println("")
アプローチ2:ハンドラー自体の限られた深さで再帰します
println("Starting strategy with the steps grouped by three")
val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,3)
val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,4)
Await.result(threeStepsInSameFuture, Duration.Inf)
Await.result(threeStepsInSameFuture2, Duration.Inf)
println("Completed strategy with all the steps grouped by three")
executorService.shutdown()