4

map私は最近、返される関数がFuture[T]順次実行されているマシンで奇妙な動作に遭遇しました。これと同じ問題は他のマシンでは発生しません。作業は期待どおりにインターリーブされます。これは、Scala が少し賢すぎてExecutionContext、マシンのリソース (1 つのコア、1 つのワーカー) に一致するものを選択したためである可能性が高いことを後で発見しました。

問題を再現する簡単なコードを次に示します。

import scala.concurrent._
import scala.concurrent.duration._

val ss = List("the", "quick", "brown", "fox",
              "jumped", "over", "the", "lazy", "dog")

def r(s: String) : Future[String] = future {
        for (i <- 1 to 10) yield {
                println(String.format("r(%s) waiting %s more seconds...",
                        s, (10 - i).toString))
                Thread.sleep(1000)

        }
        s.reverse
}

val f_revs = ss.map { r(_) }

println("Look ma, no blocking!")

val rev = f_revs.par.map { Await.result(_, Duration.Inf) }.mkString(" ")

println(rev)

奇妙な動作を示すマシンでこれを実行すると、次のような順次出力が生成されます。

Look ma, no blocking!
r(the) waiting 9 more seconds...
r(the) waiting 8 more seconds...
r(the) waiting 7 more seconds...

カスタムの提供ExecutionContext:

val pool = Executors.newFixedThreadPool(1)
implicit val ec = ExecutionContext.fromExecutor(pool)

スレッドがこのマシンでインターリーブできるようにします。しかし、ここで新たな問題が発生しました。スレッド プールがシャットダウンせず、プログラムが無期限に停止します。どうやら、これはs の予想される動作であり、どこかFixedThreadPoolに置くことでシャットダウンできます。pool.shutdown()

私を頑固と呼んでください。しかし、スレッド プールにシャットダウンを指示する必要はありません。デフォルトのプールと同じように、すべてのキューが空になったときに (おそらく少し遅れて) シャットダウンするようにプールを構成する方法はありますか? ExecutionContextドキュメントに目を通しましたが、探しているものが見つかりません。

4

2 に答える 2

2

Scala は、Java のものとは異なる動作をする独自の fork-join 実装を使用するため、デフォルトExecutionContextと で作成したものとの間で動作が異なりますExecutors

必要なことを行う簡単な方法は、次のシステム プロパティを設定してデフォルトを構成することですExecutionContext

  1. scala.concurrent.context.minThreads最小数のスレッドを課します。デフォルトは1です。
  2. scala.concurrent.context.numThreads糸の数を設定します。デフォルトはx1です。
  3. scala.concurrent.context.maxThreadsスレッドの最大数を課します。デフォルトはx1です。

これらのそれぞれはx、プロセッサ数の倍数を示すために、数値または が前に付いた数値のいずれかです。スレッド数を増やすには、 と の両方を変更する必要がnumThreadsありmaxThreadsます。あなたの場合、両方をに設定するx2とうまくいくはずです。

于 2014-06-11T19:15:00.373 に答える
1

Java 7 には追加ExecutorServiceの s、特に、私が望むことを行うForkJoinPoolがあるようです (つまり、shutdown()プールは必要ありません)。

私が望むものを達成するには、プールを次のように変更するだけで十分です。

val pool = new java.util.concurrent.ForkJoinPool(5)

Java 8 には明らかにさらに多くのサービスがあります。

于 2014-06-11T19:02:15.480 に答える