7

分割統治(別名フォーク/結合)アプローチを使用して、数値計算の問題を解決しようとしています。コードは次のとおりです。

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

それは(素晴らしいスピードアップで)実行されますが、将来のapplyメソッドはスレッドをブロックしているようで、スレッドプールは大幅に増加します。また、作成されるスレッドが多すぎると、計算がスタックします。

スレッドを解放する先物の一種の反応方法はありますか?または、その動作を実現する他の方法はありますか?

編集:私はscala2.8.0.finalを使用しています

4

1 に答える 1

8

自分のを主張(適用)しないでください。Futureこれにより、ブロックして回答を待つ必要があります。これまで見てきたように、これはデッドロックにつながる可能性があります。代わりに、それらを単調に使用して、完了したときに何をすべきかを伝えます。それ以外の:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

これを試して:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

この結果は、マージされた結果を含むResponder[Result](基本的には)になります。またはFuture[Result]を使用してこの最終値で効果的なことを行うことができます。または、別のにそれを行うことができます。ブロッキングは必要ありません。将来のために計算をスケジュールし続けるだけです。respond()foreach()map()flatMap()Responder[T]

編集1:

さて、関数のシグネチャは今computeに変更する必要がResponder[Result]ありますが、それは再帰呼び出しにどのように影響しますか?これを試してみましょう:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

これで、呼び出しがすでに戻っているため、呼び出しをcomputeでラップする必要がなくなりました(のスーパークラス)。future(...)ResponderFuture

編集2:

この継続渡しスタイルを使用した結果の1つは、トップレベルのコード(最初に呼び出されたものが何であれ)がcomputeブロックされなくなることです。から呼び出されていてmain()、それがプログラムのすべてである場合、これは問題になります。これは、大量の先物を生成し、すぐにシャットダウンして、指示されたすべての処理を終了するためです。あなたがする必要があるのはblock、これらすべての先物についてですが、トップレベルで一度だけ、そしてすべての計算の結果だけであり、中間の計算ではありません。

残念ながら、Responderによって返されるこのものには、以前のようなcompute()ブロッキングapply()メソッドがありませんFuture。flatMappingが;の代わりにFutureジェネリックを生成する理由がわかりません。これはAPIの間違いのようです。しかし、いずれにせよ、あなたはあなた自身のものを作ることができるはずです:ResponderFuture

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

mainこれで、次のようにメソッドで計算するブロッキング呼び出しを作成できます。

val finalResult = claim(compute(input))
于 2010-10-04T08:40:04.097 に答える