16

複数の先物を並行して実行する必要があり、プログラムがクラッシュまたはハングすることはありません。

今のところ、future を 1 つずつ待ち、TimeoutException が発生した場合はフォールバック値を使用します。

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

// <- at this point all 3 futures are running

// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds 
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

def toFallback[T](f: Future[T], to: Int, default: T) = {
  Try(Await.result(f, to seconds))
    .recover { case to: TimeoutException => default }
}

ご覧のとおり、このスニペットの最大待機時間はtimeout1 + timeout2 + timeout3

私の質問は次のとおりです。これらの先物すべてを一度に待機して、待機時間を に短縮するにはどうすればよいmax(timeout1, timeout2, timeout3)ですか?

編集:最後に、@Jatin と @senia の回答の変更を使用しました:

private def composeWaitingFuture[T](fut: Future[T], 
                                    timeout: Int, default: T) =
  future { Await.result(fut, timeout seconds) } recover {
    case e: Exception => default
  }

その後、次のように使用されます。

// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]()) 

// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
  for {
    r1 <- res1
    r2 <- res2
    r3 <- res3
  } yield (r1, r2, r3)

後で、必要に応じて使用combinedFutureします。

4

7 に答える 7

14

または for-comprehensionfutureを使用して、3 つの先物すべての結果を返すように作成できます。flatMap

val combinedFuture =
  for {
    r1 <- future1
    r2 <- future2
    r3 <- future3
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max)

使用している場合はakka、タイムアウト後にデフォルト値で将来を完了することができます:

implicit class FutureHelper[T](f: Future[T]) extends AnyVal{
  import akka.pattern.after
  def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = {
    val delayed = after(t.duration, system.scheduler)(Future.successful(default))
    Future firstCompletedOf Seq(f, delayed)
  }
}

val combinedFuture =
  for {
    r1 <- future1.orDefault(timeout1, Map())
    r2 <- future2.orDefault(timeout2, List())
    r3 <- future3.orDefault(timeout3, Map())
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max)
于 2013-07-04T10:19:03.083 に答える
10
def toFallback[T](f: Future[T], to: Int, default: T) = {
  future{
  try{
        Await.result(f, to seconds)
   }catch{
        case e:TimeoutException => default
  }
 }

このブロックを非同期にすることもでき、各リクエストは最大時間待機します。スレッドが多すぎる場合は、Akka のsystem scheduler. @Senia は、これについて以下で回答しています。

于 2013-07-04T10:17:48.700 に答える
3

Await.resultブロックするためだけにスレッドを使用するため、使用は避けます。先物のタイムアウトを実装する 1 つのオプションは次のとおりです。

val timer = new Timer()

def toFallback[T](f: Future[T], timeout: Int, default: T) = {
  val p = Promise[T]()
  f.onComplete(result => p.tryComplete(result))
  timer.schedule(new TimerTask {
    def run() {
      p.tryComplete(Success(default))
    }
  }, timeout)
  p.future
}

これにより、future または指定されたタイムアウト後のデフォルトの結果のいずれか早いほうによって完了する promise が作成されます。

クエリを同時に実行するには、次のようにします。

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

val res1 = toFallback(future1, timeout1, Map[String, Int]())
val res2 = toFallback(future2, timeout2, List[Int]())
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())

val resultF = for {
  r1 <- res1
  r2 <- res2
  r3 <- res3
} yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(resultF, Duration.Inf)
println(s"$r1, $r2, $r3")

//or
resultF.onSuccess {
  case (r1, r2, r3) => println(s"$r1, $r2, $r3")
}
于 2013-07-04T10:19:43.760 に答える
0

これはちょっとハックかもしれませんが、単純に経過時間を測定し、それに応じてタイムアウトを変更することができます。仮定timeout1 <= timeout2 <= timeout3

def now     = System.currentTimeMillis();
val start   = now;
def remains(timeout: Long): Long
            = math.max(0, timeout + start - now)

def toFallback[T](f: Future[T], to: Int, default: T) = {
  Try(Await.result(f, remains(to) seconds))
    .recover { case to: TimeoutException => default }
}

このように、各タイムアウトはstart = now呼び出された瞬間に基づいているため、全体の実行時間は最大でもtimeout3. タイムアウトが指定されていない場合でも機能しますが、一部のタスクは指定されたタイムアウトよりも長く実行されたままになる可能性があります。

于 2013-07-04T11:47:07.203 に答える
0

デフォルトの例外キャプチャとリターンを実行するようにFuture 自分自身を取得してみませんか? 次に、各フューチャを順番に単純Awaitに処理でき、フューチャ外での例外処理について心配する必要はありません。

于 2013-07-04T10:42:00.947 に答える
0

Monix Taskを使用します。これは、強化された Future です。

import monix.execution.Scheduler.Implicits.global
import monix.eval._
import scala.concurrent.duration._

val task1 = Task{Thread.sleep(1);"task1"}.timeoutTo(timeout1,Task.now("timeout1"))
val task2 = Task{Thread.sleep(2);"task2"}.timeoutTo(timeout2,Task.now("timeout2"))
Task.zipList(Seq(task1,task2)).runSyncUnsafe(Duration.Inf)
于 2018-05-16T08:20:23.583 に答える