2

オーバーヘッドがあまりない、scala の別のスレッドで非常に単純なタスクを実行する方法があるかどうか疑問に思っていました。

基本的に、任意の数のタスクの実行を処理できるグローバルな「エグゼキューター」を作成したいと考えています。その後、executor を使用して、追加の構成を構築できます。

さらに、クライアントがブロッキングまたは非ブロッキングの考慮事項を考慮する必要がなければよいでしょう。

Scala アクター ライブラリは Doug Lea FJ のものの上に構築されており、私が達成しようとしていることをある程度サポートしていることも知っています。ただし、私の理解では、「アクター プール」を事前に割り当てて達成する必要があります。

このためのグローバル スレッド プールを作成することは避けたいと思います。

簡単な例を次に示します。

import concurrent.SyncVar
object SimpleExecutor {
  import actors.Actor._
  def exec[A](task:  => A) : SyncVar[A] = {
    //what goes here?
    //This is what I currently have
    val x = new concurrent.SyncVar[A]
    //The overhead of making the actor appears to be a killer
    actor {
      x.set(task)
    }
    x
  }
  //Not really sure what to stick here
  def execBlocker[A](task: => A) : SyncVar[A] = exec(task)

}

次に、exec の使用例を示します。

object Examples {
  //Benchmarks a task
  def benchmark(blk : => Unit) = {
    val start = System.nanoTime
    blk
    System.nanoTime - start
  }

  //Benchmarks and compares 2 tasks
  def cmp(a: => Any, b: => Any) = {
    val at = benchmark(a)
    val bt = benchmark(b)
    println(at + " " + bt + " " +at.toDouble / bt)
  }

  //Simple example for simple non blocking comparison
  import SimpleExecutor._
  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

  //Simple example for the blocking performance
  import Thread.sleep
  def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
  def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}

最後にサンプルを実行します (HotSpot がウォームアップできるように、数回実行する必要があるかもしれません):

import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
4

1 に答える 1

8

そのFuturesために作られました。ただ、新しい先物を作成するためimport scala.actors.Futures._に使用し、結果をしばらく待つ、または結果が受信されるまでブロックする、準備ができているかどうかを確認するなどの方法を使用します。futureawaitAllapplyrespondisSet

スレッド プールを作成する必要もありません。または、少なくとも、通常はそうではありません。なぜそう思うのですか?

編集

整数の加算のような単純なものを並列化してもパフォーマンスは向上しません。関数呼び出しよりも高速だからです。並行性は、I/O のブロックによる時間の損失を回避し、複数の CPU コアを使用してタスクを並行して実行することによってのみ、パフォーマンスを向上させます。後者の場合、タスクは、ワークロードを分割して結果をマージするコストを相殺するのに十分なほど計算コストが高くなければなりません。

並行性を採用するもう 1 つの理由は、アプリケーションの応答性を向上させることです。それは高速化ではなく、ユーザーへの応答を高速化することです。それを行う 1 つの方法は、比較的高速な操作を別のスレッドにオフロードして、ユーザーが見たり行ったりすることを処理するスレッドを高速化することです。しかし、私は脱線します。

コードに重大な問題があります:

  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

または、先物に変換すると、

  def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

paraAdd並列でタスクを実行していると思うかもしれませんが、そうではありません。Range非厳密な実装があるためですmap(これは Scala 2.7 までです。Scala 2.8.0 以降Rangeは厳密です)。他の Scala の質問で調べることができます。何が起こるかは次のとおりです。

  1. 0からまでの範囲が作成されますhi
  2. 範囲射影は、範囲の各要素 i から、future(i+5)呼び出されたときに返される関数に作成されます。
  3. 範囲射影 ( の各要素についてi => future(i+5))、要素が評価され (foreach厳密)、関数applyが呼び出されます。

したがって、futureはステップ 2 では呼び出され、ステップ 3 でのみ呼び出されるため、それぞれfutureが完了するのを待ってから次のステップを実行します。次の方法で修正できます。

  def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)

これによりパフォーマンスが向上しますが、単純な即時追加ほど優れたものではありません. 一方、次のようにするとします。

def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) = 
  (0 until n).force map (_ => future(f)) foreach (_.apply)

そして比較します:

cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))

利益が見られるようになるかもしれません (コアの数とプロセッサの速度によって異なります)。

于 2009-09-02T21:52:24.220 に答える