次のコードは、ファイルを 1 行ずつ読み取り、各行のタスクを作成してから、executor のキューに入れます。エグゼキューターのキューがいっぱいになると、ファイルからの読み取りは、再びスペースができるまで停止します。
SOでいくつかの提案を見ましたが、それらはすべて、ファイルのコンテンツ全体をメモリに読み込むか、次善のスケジューリングを必要とします(たとえば、100行を読み取り、それらを並行して処理し、それが終了した後にのみ、次の100行を読み取ります) . また、これには Akka のようなライブラリも使用したくありません。
これらの欠点なしでこれを達成するScalaの方法は何ですか?
val exec = executorWithBoundedQueue()
val lines = Source.fromFile(sourceFile, cs).getLines()
lines.map {
l => exec.submit(new Callable[String] {
override def call(): String = doStuff(l)
})
}.foreach {
s => consume(s.get())
}
exec.shutdown()
の例示的な定義executorWithBoundedQueue
def executorWithBoundedQueue(): ExecutorService = {
val boundedQueue = new LinkedBlockingQueue[Runnable](1000)
new ThreadPoolExecutor(boundedQueue)
}