2

Play 2 で Iteratees を使用して彗星の結果をストリーミングする方法をつかもうとしています。コールバックから列挙子を作成し、マップから列挙子を作成するハンドルを取得しました。私の問題は Enumeratee.map にあります。これは、純粋な入力を受け取り、純粋な出力を返す関数を受け取ります (例: docの String から Int への変換)。私がやりたいことは、純粋な入力を取り、結果の約束を返すことです。結局、列挙子は enumeratee に promise を供給し、enumeratee は 1 つの列挙子を別の列挙子に変換するため、promise にマップする enumeratee を作成する方法が必要です。

さて、これをもう少し明確にするために例を挙げましょう。データベースでクエリする ID のリストを含む HTTP リクエストが届いたとします。これらの ID がデータベース テーブルの行を表し、リクエストがこれらの行に対して一連の (長い) 計算を実行し、計算を表す一連の json オブジェクトを返すとします。私は長い間ブロックする必要があるため、一度に 1 つの ID をストリーミングするのはクールなので、次のような enumeratee パイプラインが必要です。

  1. データベース内の行を照会します (行の promise を返します)
  2. 行に対して長い計算を行います (行を取り、計算の約束を返します)
  3. 長い計算を JSON に変換する
  4. &> これを Play 2 が提供する Comet enumeratee に渡します

1 はちょっと簡単です。クエリ結果の約束を返す fromCallback を使用して列挙子を作成できます。3 も簡単です。単純な Enumeratee.map です。

しかし、ステップ2のenumerateeのapplyOnを実装する方法に頭を悩ませることはできません。「内側の」イテラティーからプロミスを取得し、長い計算をflatMapして返す新しいイテラティーを構築する必要があることを理解できます。新しい約束。私が得られないのは、奇妙なapplyOn署名を考慮してこれを作成する方法です:def applyOn[A](it: Iteratee[To, A]): Iteratee[From, Iteratee[To, A]]

誰かがそれを手伝ってくれますか?

ありがとう

4

2 に答える 2

3

f: E => Promise[NE] を取り、 Enumeratee[E, NE] を返すマスター Enumeratee.mapM[E] のメソッドがあります。

https://github.com/playframework/Play20/blob/master/framework/src/play/src/main/scala/play/api/libs/iteratee/Enumeratee.scala#L150

于 2012-06-01T22:06:31.070 に答える
1

の署名はapplyOn、enumeratee が のように右側の iteratee と組み合わされていると考えると、より理にかなっていますenumerator |>> (enumeratee &> iteratee)。iteratee には型がIteratee[E, A]あり、列挙子は a を期待するIteratee[Promise[E], Iteratee[E, A]ため、内部の iteratee を抽出できます。したがって、 aを返さapplyOnなければなりません。Iteratee[E, A]Iteratee[Promise[E], Iteratee[E, A]

実装の概要は次のとおりです。入力を受け取り、期待される結果を返すステップ関数を定義します。次に、promise 要素を再帰的にステップ実行します。

import play.api.libs.concurrent._
import play.api.libs.iteratee._

def unpromise[E]: Enumeratee[Promise[E], E] = new Enumeratee[Promise[E], E] {
  def applyOn[A](inner: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = {
    def step(input: Input[Promise[E]], i: Iteratee[E, A]): Iteratee[Promise[E], Iteratee[E, A]] = {
      input match {
        case Input.EOF => Done(i, Input.EOF)
        case Input.Empty => Cont(step(_, i))
        case Input.El(pe) =>
          val pe2 = pe.map(e => i.feed(Input.El(e))).flatMap(identity)
          val i2 = Iteratee.flatten(pe2)
          i2.pureFlatFold(
            (a, e2) => Done(i2, Input.Empty),
            k => Cont(step(_, i2)),
            (msg, e2) => Done(i2, Input.Empty))
      }
    }
    // should check that inner is not done or error - skipped for clarity
    Cont(step(_, inner))
  }
}

を破棄e2しているので、一部の入力が失われないようにするためのコードがおそらく他にもあります。

于 2012-05-22T14:47:41.217 に答える