標準のscala.actorsパッケージを使用して、Scalaのディスパッチャー-ワーカーアクターパターンを設計しようとしています。
ディスパッチャはaから作業を受け取り、java.util.concurrent.LinkedBlockingQueue
それをワーカーアクターに送信して処理します。すべての作業が完了したら、ディスパッチャは各ワーカーに終了するように指示し、次に終了する必要があります。これが私が思いついたコードですが、すべての作業が完了するとハングします('GiveMeWork
ディスパッチャーのキューに保留中のメッセージがあると思います):
import java.util.concurrent.LinkedBlockingQueue
import scala.actors.Actor
object Dispatcher
extends Actor {
println("Dispatcher created")
def act() {
val workers = (1 to 4).map(id => (new Worker(id)).start())
loop {
react {
case 'GiveMeWork =>
// println("Worker asked for work")
val (time, i) = workQueue.take()
if (time == 0) {
println("Quitting time")
workers.foreach(_ !? 0L)
} else {
println("Arrival at dispatcher: i: " + i + " dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
sender ! time
}
case 'Quit =>
println("Told to quit")
sender ! 'OffDuty
exit()
}
}
}
}
class Worker(id: Int)
extends Actor {
println("Worker(" + id + ") created")
var jobs = 0
def act() {
Dispatcher ! 'GiveMeWork
loop {
react {
case time: Long =>
if (time == 0) {
println("Worker(" + id + ") completed " + jobs + " jobs")
sender ! 'OffDuty
exit()
} else {
println("Arrival at worker(" + id + "): dispatch time: " +
time + ", elapsed: " + (System.nanoTime() - time))
Thread.sleep(id)
jobs += 1
Dispatcher ! 'GiveMeWork
}
}
}
}
}
val workQueue = new LinkedBlockingQueue[(Long, Int)](1000)
Dispatcher.start()
for (i <- 0 until 5000) {
Thread.sleep(1)
workQueue.put((System.nanoTime(), i))
}
workQueue.put((0L, 0))
println("Telling Dispatcher to quit")
Dispatcher !? 'Quit