0

与えられたミリ秒内に同じタスクを可能な限り何度も実行する必要があります。そこで、以下のコードのように scala.actors.threadpool.Executors を使用して作成しました。

import actors.threadpool.{TimeUnit, Future, Executors}
import collection.mutable.ListBuffer

object TaskRepeater {

  def repeat(task: Runnable, millisecs: Long): Int = {
    val exector = Executors.newCachedThreadPool
    val remover = Executors.newCachedThreadPool

    val queue = ListBuffer[Future]()
    def removeDone() {
      (queue filter (_.isDone)) foreach { f =>
        queue.remove(queue indexOf f)
      }
    }

    val lock = new EasyLock()

    val maxQueueSize = scala.collection.parallel.availableProcessors

    val queueAvaiable = lock.mkCondition(queue.size < maxQueueSize)

    var cnt = 0

    val start = System.nanoTime()

    while(System.nanoTime() - start < millisecs * 1000000) {
      lock {
        queueAvaiable.waitUntilFulfiled()

        cnt += 1
        val r = exector.submit(task)
        queue += r

        remover.submit(runneble {
          r.get()
          lock {
            removeDone()
            queueAvaiable.signalIfFulfilled()
          }
        })
      }
    }

    exector.shutdown()
    remover.shutdown()

    assert(exector.awaitTermination(1, TimeUnit.SECONDS))

    cnt
  }

  def runneble(f: => Unit) = new Runnable {
    def run() {
      f
    }
  }
}

import actors.threadpool.locks.{Condition, ReentrantLock}

class EasyLock {
  val lock = new ReentrantLock

  def mkCondition(f: => Boolean): EasyCondition = {
    new EasyCondition(lock.newCondition(), f)
  }

  def apply(f: => Unit) {
    lock.lock()
    try {
      f
    } finally {
      lock.unlock()
    }
  }
}

class EasyCondition(c: Condition, condition: => Boolean) {

  def waitUntilFulfiled(f: => Unit) {
    while (!condition) {
      c.await()
    }
    f
  }

  def signalAllIfFulfilled() {
    if (condition) c.signalAll()
  }

  def signalIfFulfilled() {
    if (condition) c.signal()
  }
}

しかし、これは少し複雑です。代わりに、Twitter の Broker と、Broker に接続して完了したタスクを Broker に送信する Executor (存在する場合) を使用すると、作業が簡単になると思います。

以下は疑似コードです

val b = new Broker
val e = new Executor // connects with the broker anyway
e.runTask
val o = b.recv
o.sync() // wait until the task finishes

このようなエグゼクターはありますか?

4

1 に答える 1

0

私があなたを正しく理解しているなら、あなたはFuturePoolこのように使うことができるかもしれません:

val b = new Broker[T]
val futurePool = FuturePool.defaultPool // or some other pool
futurePool(task).onSuccess { b ! _ }
于 2012-10-30T07:20:46.540 に答える