11

Seq[Future[X]] をEnumerator[X]に変える方法はありますか? ユースケースは、Web をクロールしてリソースを取得したいというものです。これは Future の Sequence を返します。先物を最初に終了した順序で Iteratee にプッシュする Enumerator を返したいと思います。

これを行うには、Victor Klang のFuture select gistを使用できるようですが、かなり非効率に見えます。

注: 問題の Iteratees と Enumerator は、play フレームワークのバージョン 2.x によって提供されるものです。つまり、次のインポートを使用します。import play.api.libs.iteratee._

4

7 に答える 7

3

Victor Klang の select メソッドを使用:

  /**
   * "Select" off the first future to be satisfied.  Return this as a
   * result, with the remainder of the Futures as a sequence.
   *
   * @param fs a scala.collection.Seq
   */
  def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): 
      Future[(Try[A], Seq[Future[A]])] = {
    @scala.annotation.tailrec
    def stripe(p: Promise[(Try[A], Seq[Future[A]])],
               heads: Seq[Future[A]],
               elem: Future[A],
               tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = {
      elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) }
      if (tail.isEmpty) p.future
      else stripe(p, heads :+ elem, tail.head, tail.tail)
    }
    if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!"))
    else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail)
   }
}

その後、必要なものを取得できます

    Enumerator.unfoldM(initialSeqOfFutureAs){ seqOfFutureAs =>
        if (seqOfFutureAs.isEmpty) {
          Future(None)
        } else {
          FutureUtil.select(seqOfFutureAs).map {
            case (t, seqFuture) => t.toOption.map {
              a => (seqFuture, a)
            }
          }
        }
    }

于 2013-03-21T11:13:07.893 に答える
2

より良く、より短く、より効率的な答えは次のとおりです。

   def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] { 
      def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = {
        Future.sequence(seqFutureX).flatMap { seqX: Seq[X] => 
            seqX.foldLeft(Future.successful(i)) {
              case (i, x) => i.flatMap(_.feed(Input.El(x)))
            }
        }
      }
    }

于 2013-03-23T17:36:20.720 に答える
1

私は質問がすでに少し古いことを認識していますが、Santhoshの回答と組み込みの Enumterator.enumerate() 実装に基づいて、次のことを思いつきました:

def enumerateM[E](traversable: TraversableOnce[Future[E]])(implicit ec: ExecutionContext): Enumerator[E] = {
  val it = traversable.toIterator
  Enumerator.generateM {
    if (it.hasNext) {
      val next: Future[E] = it.next()
      next map {
        e => Some(e)
      }
    } else {
      Future.successful[Option[E]] {
        None
      }
    }
  }
}

最初の Viktor 選択ベースのソリューションとは異なり、これは順序を保持しますが、すべての計算を非同期で開始できることに注意してください。したがって、たとえば、次のことができます。

// For lack of a better name
def mapEachM[E, NE](eventuallyList: Future[List[E]])(f: E => Future[NE])(implicit ec: ExecutionContext): Enumerator[NE] =
  Enumerator.flatten(
    eventuallyList map { list =>
      enumerateM(list map f)
    }
  )

この後者の方法は、実際、このスレッドに出くわしたときに探していたものでした. それが誰かを助けることを願っています! :)

于 2014-06-26T07:53:57.477 に答える
0

Java Executor Completement Service( JavaDoc )を使用して構築できます。アイデアは、新しい先物のシーケンスを作成することを使用し、それぞれExecutorCompletionService.take()が次の結果を待つために使用することです。前の未来がその結果をもたらすとき、それぞれの未来は始まります。

ただし、多くの同期が舞台裏で行われているため、これはそれほど効率的ではない可能性があることに注意してください。計算に並列マップリデュースを使用し(たとえば、ScalaのParSeqを使用)、列挙子に完全な結果を待たせる方が効率的かもしれません。

于 2013-03-21T11:32:05.457 に答える
0

警告: 回答する前にコンパイルされていません

このようなものはどうですか:

def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] { 
  def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = 
    Future.fold(seqFutureX)(i){ case (i, x) => i.flatMap(_.feed(Input.El(x)))) }
}
于 2013-03-28T16:24:44.890 に答える
0

ブロードキャストの使用を提案したい

def seqToEnumerator[A](futuresA: Seq[Future[A]])(defaultValue: A, errorHandler: Throwable => A): Enumerator[A] ={
    val (enumerator, channel) = Concurrent.broadcast[A]
    futuresA.foreach(f => f.onComplete({
      case Success(Some(a: A)) => channel.push(a)
      case Success(None) => channel.push(defaultValue)
      case Failure(exception) => channel.push(errorHandler(exception))
    }))
    enumerator
  }

errorHandling と defaultValues を追加しましたが、onComplete の代わりに onSuccess または onFailure を使用してこれらをスキップできます

于 2014-11-12T02:12:24.947 に答える
0

ここで便利なものを見つけました、

def unfold[A,B](xs:Seq[A])(proc:A => Future[B])(implicit errorHandler:Throwable => B):Enumerator[B] = {
    Enumerator.unfoldM (xs) { xs =>
        if (xs.isEmpty) Future(None)
        else proc(xs.head) map (b => Some(xs.tail,b)) recover {
            case e => Some((xs.tail,errorHandler(e)))
        }
    }
}

def unfold[A,B](fxs:Future[Seq[A]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = {

    (unfold(Seq(fxs))(fxs => fxs)(errorHandler1)).flatMap(unfold(_)(proc)(errorHandler))
}

def unfoldFutures[A,B](xsfxs:Seq[Future[Seq[A]]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = {

    xsfxs.map(unfold(_)(proc)).reduceLeft((a,b) => a.andThen(b))
}
于 2014-01-03T16:09:13.650 に答える