与えられたミリ秒内に同じタスクを可能な限り何度も実行する必要があります。そこで、以下のコードのように scala.actors.threadpool.Executors を使用して作成しました。
import actors.threadpool.{TimeUnit, Future, Executors}
import collection.mutable.ListBuffer
object TaskRepeater {
def repeat(task: Runnable, millisecs: Long): Int = {
val exector = Executors.newCachedThreadPool
val remover = Executors.newCachedThreadPool
val queue = ListBuffer[Future]()
def removeDone() {
(queue filter (_.isDone)) foreach { f =>
queue.remove(queue indexOf f)
}
}
val lock = new EasyLock()
val maxQueueSize = scala.collection.parallel.availableProcessors
val queueAvaiable = lock.mkCondition(queue.size < maxQueueSize)
var cnt = 0
val start = System.nanoTime()
while(System.nanoTime() - start < millisecs * 1000000) {
lock {
queueAvaiable.waitUntilFulfiled()
cnt += 1
val r = exector.submit(task)
queue += r
remover.submit(runneble {
r.get()
lock {
removeDone()
queueAvaiable.signalIfFulfilled()
}
})
}
}
exector.shutdown()
remover.shutdown()
assert(exector.awaitTermination(1, TimeUnit.SECONDS))
cnt
}
def runneble(f: => Unit) = new Runnable {
def run() {
f
}
}
}
import actors.threadpool.locks.{Condition, ReentrantLock}
class EasyLock {
val lock = new ReentrantLock
def mkCondition(f: => Boolean): EasyCondition = {
new EasyCondition(lock.newCondition(), f)
}
def apply(f: => Unit) {
lock.lock()
try {
f
} finally {
lock.unlock()
}
}
}
class EasyCondition(c: Condition, condition: => Boolean) {
def waitUntilFulfiled(f: => Unit) {
while (!condition) {
c.await()
}
f
}
def signalAllIfFulfilled() {
if (condition) c.signalAll()
}
def signalIfFulfilled() {
if (condition) c.signal()
}
}
しかし、これは少し複雑です。代わりに、Twitter の Broker と、Broker に接続して完了したタスクを Broker に送信する Executor (存在する場合) を使用すると、作業が簡単になると思います。
以下は疑似コードです
val b = new Broker
val e = new Executor // connects with the broker anyway
e.runTask
val o = b.recv
o.sync() // wait until the task finishes
このようなエグゼクターはありますか?