1

私は先物用の scala <-> Java 相互運用ラッパーを書いていますが、scala.concurrent.Future.onComplete を実装する正しい方法がわかりません ( http://www.scala-lang.org/api/current/index. html#scala.concurrent.Future )。これはおそらくうまくいきます:

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
  executor.execute(new Runnable {
    @tailrec
    def run = value match {
      case Some(t) => func(t)
      case None => { Thread.sleep(100); run }
    }
  })
}

しかし、Future を使用した Scala の非同期 IO は、ブロックする必要がある場合は、コードの関連部分を scala.concurrent.blocking に渡して、ExecutionContext に状況を知らせる必要があることを示唆しています。問題は、値の match{...} をブロッキング {} で囲むと、末尾呼び出しではなくなることです。

これを行うためのことわざの正しい方法は何ですか?

編集:完全を期すために、ここにラッピングクラス全体があります:

class JavaFutureWrapper[T](val jf: java.util.concurrent.Future[T]) extends scala.concurrent.Future[T] {
  def isCompleted = jf.isDone

  def result(atMost: Duration)(implicit permit: CanAwait): T =
    atMost match { case Duration(timeout, units) => jf.get(timeout, units) }

  def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
    executor.execute(new Runnable {
      @tailrec
      def run = value match {
        case Some(t) => func(t)
        case None => { Thread.sleep(100); run }
      }
    })
  }

  def ready(atMost: Duration)(implicit permit: CanAwait): this.type = atMost match {
    case Duration(timeout, units) => {
      jf.get(timeout, units)
      this
    }
  }

  def value: Option[Try[T]] = (jf.isCancelled, jf.isDone) match {
    case (true, _) => Some(Failure(new Exception("Execution was cancelled!")))
    case (_, true) => Some(Success(jf.get))
    case _ => None
  }
}
4

2 に答える 2

2

Javaの未来が完了するのを待つだけです:

import scala.util.{Try, Success, Failure}
import scala.concurrent._
import java.util.concurrent.TimeUnit

class JavaFutureWrapper[T](val jf: java.util.concurrent.Future[T])
  extends scala.concurrent.Future[T] {
  ...

  def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
    executor.execute(new Runnable {
      def run: Unit = {
        val result = Try(blocking(jf.get(Long.MaxValue, TimeUnit.MILLISECONDS)))
        func(result)
      }
    })
  ...
}
于 2013-08-22T15:32:49.710 に答える
0

うーん、 0__ の回答に対する私の編集は承認されなかったので、将来の読者のために、ここで私が使用するソリューションを示します (これは 0__ から単純化されています)

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
  executor.execute(new Runnable {
    def run = func(Try( blocking { jf.get } ))
  })
}
于 2013-08-22T16:19:28.193 に答える