87

いくつかの先物があり、それらのいずれかが失敗するか、すべてが成功するまで待つ必要があるとします。

例: 3 つの先物があるとします: f1f2f3

  • f1成功して失敗した場合f2、待機しません(クライアントに失敗f3を返します)。

  • まだ実行中にf2失敗した場合、私はそれらを待ちません(そして失敗を返します)f1f3

  • f1が成功し、次に成功した場合、f2を待ち続けf3ます。

どのように実装しますか?

4

8 に答える 8

85

代わりに、次のように for-comprehension を使用できます。

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

この例では、先物 1、2、3 が並行して開始されます。次に、for 内包表記で、結果 1、2、3 が利用可能になるまで待ちます。1 または 2 のいずれかが失敗した場合、もう 3 を待ちません。3 つすべてが成功した場合、aggFutval は 3 つの Future の結果に対応する 3 つのスロットを持つタプルを保持します。

fut2 が最初に失敗した場合に待機を停止する動作が必要な場合は、少し複雑になります。上記の例では、fut2 が失敗したことに気付く前に、fut1 が完了するのを待つ必要があります。それを解決するには、次のようなことを試すことができます。

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

これで正しく動作しますが、問題は、正常に完了しFutureたときに からどれを削除するかを知ることにあります。Map結果をその結果を生み出した Future と適切に関連付ける何らかの方法がある限り、このようなものは機能します。完了した Future を Map から再帰的に削除し、残りがなくなるまでFuture.firstCompletedOf残りの Future を呼び出しFutures、途中で結果を収集し続けます。きれいではありませんが、話している動作が本当に必要な場合は、これまたは同様のものが機能する可能性があります。

于 2013-04-27T23:14:19.607 に答える
35

promise を使用して、最初の失敗、または最後に完了した集約された成功のいずれかを送信できます。

def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
  val p = Promise[M[A]]()

  // the first Future to fail completes the promise
  in.foreach(_.onFailure{case i => p.tryFailure(i)})

  // if the whole sequence succeeds (i.e. no failures)
  // then the promise is completed with the aggregated success
  Future.sequence(in).foreach(p trySuccess _)

  p.future
}

次に、ブロックしたい場合Awaitはその結果、またはそれを別のものにすることができます。Futuremap

for Comprehension との違いは、ここでは最初に失敗したエラーが発生するのに対し、for Comprehension では入力コレクションの走査順序で最初のエラーが発生することです (別のコレクションが最初に失敗した場合でも)。例えば:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order

と:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)
于 2013-04-27T23:57:16.820 に答える
8

これは、アクターを使用しないソリューションです。

import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger

// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
  val remaining = new AtomicInteger(fs.length)

  val p = promise[T]

  fs foreach {
    _ onComplete {
      case s @ Success(_) => {
        if (remaining.decrementAndGet() == 0) {
          // Arbitrarily return the final success
          p tryComplete s
        }
      }
      case f @ Failure(_) => {
        p tryComplete f
      }
    }
  }

  p.future
}
于 2013-04-27T22:15:24.883 に答える
6

この目的のために、Akka アクターを使用します。for-comprehension とは異なり、future のいずれかが失敗するとすぐに失敗するため、その意味では少し効率的です。

class ResultCombiner(futs: Future[_]*) extends Actor {

  var origSender: ActorRef = null
  var futsRemaining: Set[Future[_]] = futs.toSet

  override def receive = {
    case () =>
      origSender = sender
      for(f <- futs)
        f.onComplete(result => self ! if(result.isSuccess) f else false)
    case false =>
      origSender ! SomethingFailed
    case f: Future[_] =>
      futsRemaining -= f
      if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
  }

}

sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result

次に、アクターを作成し、メッセージを送信して (応答の送信先を認識できるように)、応答を待ちます。

val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
  val f4: Future[Result] = actor ? ()
  implicit val timeout = new Timeout(30 seconds) // or whatever
  Await.result(f4, timeout.duration).asInstanceOf[Result] match {
    case SomethingFailed => println("Oh noes!")
    case EverythingSucceeded => println("It all worked!")
  }
} finally {
  // Avoid memory leaks: destroy the actor
  actor ! PoisonPill
}
于 2013-04-27T21:28:00.530 に答える
5

先物だけでこれを行うことができます。これが1つの実装です。実行が早期に終了しないことに注意してください。その場合は、より洗練された処理を行う必要があります (おそらく、自分で割り込みを実装する必要があります)。しかし、うまくいかないことを待ち続けたくない場合は、最初のことが終わるのを待ち続け、何も残っていないか、例外に遭遇したときに停止することが重要です。

import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global

@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
Either[Throwable, Seq[A]] = {
  val first = Future.firstCompletedOf(fs)
  Await.ready(first, Duration.Inf).value match {
    case None => awaitSuccess(fs, done)  // Shouldn't happen!
    case Some(Failure(e)) => Left(e)
    case Some(Success(_)) =>
      val (complete, running) = fs.partition(_.isCompleted)
      val answers = complete.flatMap(_.value)
      answers.find(_.isFailure) match {
        case Some(Failure(e)) => Left(e)
        case _ =>
          if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
          else Right( answers.map(_.get) ++: done )
      }
  }
}

すべてが正常に機能する場合の動作例を次に示します。

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))

しかし、何か問題が発生した場合:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)

scala> Bye!
于 2013-04-27T23:12:42.377 に答える
5

Twitter の Future API をチェックアウトすることをお勧めします。特に Future.collect メソッド。それはまさにあなたが望むことをします: https://twitter.github.io/scala_school/finagle.html

ソース コード Future.scala は、 https ://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala から入手できます。

于 2014-12-13T23:05:11.227 に答える