4

Iteratees と Enumerator を使用する playframework の非同期 I/O ライブラリを使用しています。これで、データ シンクとして Iterator[T] が作成されました (簡単にするために、内容をファイルに格納する Iterator[Byte] とします)。この Iterator[Byte] は、書き込みを処理する関数に渡されます。

しかし、書き込みの前に、ファイルの先頭にいくつかの統計情報を追加したいので (簡単にするために 1 バイトとします)、書き込み関数に渡す前に次の方法でイテレータを転送します。

def write(value: Byte, output: Iteratee[Byte]): Iteratee[Byte] =
    Iteratee.flatten(output.feed(Input.El(value)))

保存されたファイルをディスクから読み取ると、そのファイルの Enumerator[Byte] が取得されます。最初に、追加のデータを読み取って削除し、残りの Enumerator[Byte] を読み取りを処理する関数に渡したいと考えています。したがって、列挙子も変換する必要があります。

def read(input: Enumerator[Byte]): (Byte, Enumerator[Byte]) = {
   val firstEnumeratorEntry = ...
   val remainingEnumerator = ...
   (firstEnumeratorEntry, remainingEnumerator)
}

しかし、これを行う方法がわかりません。Enumerator からいくつかのバイトを読み取り、残りの Enumerator を取得するにはどうすればよいですか?

Iteratee[Byte] を OutputStream に、Enumerator[Byte] を InputStream に置き換えると、これは非常に簡単になります。

def write(value: Byte, output: OutputStream) = {
    output.write(value)
    output
}
def read(input: InputStream) = (input.read,input)

しかし、play フレームワークの非同期 I/O が必要です。

4

3 に答える 3

3

別の角度から目標に取り組んでいただけないでしょうか。

残りの列挙子を使用するその関数を と呼びましょうremaining。おそらく、残りの処理を行うために iteratee に適用されますremaining |>> iteratee。つまり、別の iteratee を生成します。その結果の iteratee を呼び出しましょうiteratee2...への参照を取得できるかどうかを確認できますかiteratee2? その場合は、最初の iteratee を使用して最初のバイトを取得して処理しhead、次に flatMap を結合headiteratee2て使用できます。

val head = Enumeratee.take[Byte](1) &>> Iteratee.foreach[Byte](println)
val processing = for { h <- head; i <- iteratee2 } yield (h, i)
Iteratee.flatten(processing).run

把握できない場合 (iteratee2列挙子が実装していない列挙型と結合する場合など)、このアプローチは機能しません。

于 2012-06-09T06:07:55.663 に答える
1

Iteratee適切な (一種の) State アキュムレータ (ここではタプル)内で折り畳むことによって、これを達成する 1 つの方法を次に示します。

routesファイルを読みに行くと、最初のバイトは a として読み取られ、もう 1 つはUTF-8 バイト文字列としてChara に追加されます。String

  def index = Action {
    /*let's do everything asyncly*/
    Async {
      /*for comprehension for read-friendly*/
      for (
        i <- read; /*read the file */
        (r:(Option[Char], String)) <- i.run /*"create" the related Promise and run it*/
      ) yield Ok("first : " + r._1.get + "\n" + "rest" + r._2) /* map the Promised result in a correct Request's Result*/
    }
  }


  def read = {
    //get the routes file in an Enumerator
    val file: Enumerator[Array[Byte]] = Enumerator.fromFile(Play.getFile("/conf/routes"))

    //apply the enumerator with an Iteratee that folds the data as wished
    file(Iteratee.fold((None, ""):(Option[Char], String)) { (acc, b) =>
       acc._1 match {
         /*on the first chunk*/ case None => (Some(b(0).toChar), acc._2 + new String(b.tail, Charset.forName("utf-8")))
         /*on other chunks*/ case x => (x, acc._2 + new String(b, Charset.forName("utf-8")))
       }
    })

  }

編集

使用するさらに別の方法を見つけましたが、2秒 (1 つは短命)Enumerateeを作成する必要があります。Enumeratorしかし、それはもう少しエレガントです。「種類の」列挙型を使用しますが、Traversal列挙型よりも細かいレベル (チャンク レベル) で動作するものを使用します。1 バイトのみを使用するtake1 を使用してから、ストリームを閉じます。もう 1 つはdrop、最初のバイトを削除するだけです (Enumerator[Array[Byte]] を使用しているため)。

さらに、read22 つの列挙子 (Promise、列挙子からそう遠くない) を返すため、希望するよりもはるかに近い署名が得られます。

def index = Action {
  Async {
    val (first, rest) = read2
    val enee = Enumeratee.map[Array[Byte]] {bs => new String(bs, Charset.forName("utf-8"))}

    def useEnee(enumor:Enumerator[Array[Byte]]) = Iteratee.flatten(enumor &> enee |>> Iteratee.consume[String]()).run.asInstanceOf[Promise[String]]

    for {
      f <- useEnee(first);
      r <- useEnee(rest)
    } yield Ok("first : " + f + "\n" + "rest" + r)
  }
}

def read2 = {
  def create = Enumerator.fromFile(Play.getFile("/conf/routes"))

  val file: Enumerator[Array[Byte]] = create
  val file2: Enumerator[Array[Byte]] = create

  (file &> Traversable.take[Array[Byte]](1), file2 &> Traversable.drop[Array[Byte]](1))

}
于 2012-06-08T17:31:25.830 に答える
1

実際、私たちIterateeは作曲するので s が好きです。したがってEnumerator、元の s から複数の s を作成する代わりに、2 つの Iteratee を順番に構成し (read-first と read-rest)、単一の Enumerator をフィードします。

このためには、シーケンシャル コンポジション メソッドが必要です。今はそれを と呼びますandThen。これが大まかな実装です。消費されていない入力を返すのは少し厳しいことに注意してください。入力タイプに基づいた型クラスで動作をカスタマイズできる可能性があります。また、最初のイテレータから 2 番目のイテレータに残ったものを渡す処理も行いません (演習 :)。

object Iteratees {
  def andThen[E, A, B](a: Iteratee[E, A], b: Iteratee[E, B]): Iteratee[E, (A,B)] = new Iteratee[E, (A,B)] {
    def fold[C](
        done: ((A, B), Input[E]) => Promise[C],
        cont: ((Input[E]) => Iteratee[E, (A, B)]) => Promise[C],
        error: (String, Input[E]) => Promise[C]): Promise[C] = {

      a.fold(
        (ra, aleft) => b.fold(
          (rb, bleft) => done((ra, rb), aleft /* could be magicop(aleft, bleft)*/),
          (bcont) => cont(e => bcont(e) map (rb => (ra, rb))),
          (s, err) => error(s, err)
        ),
        (acont) => cont(e => andThen[E, A, B](acont(e), b)),
        (s, err) => error(s, err)
      )
    }
  }
}

これで、次のものを使用できます。

object Application extends Controller {

  def index = Action { Async {

    val strings: Enumerator[String] = Enumerator("1","2","3","4")
    val takeOne = Cont[String, String](e => e match {
      case Input.El(e) => Done(e, Input.Empty)
      case x => Error("not enough", x)
    })
    val takeRest = Iteratee.consume[String]()
    val firstAndRest = Iteratees.andThen(takeOne, takeRest)

    val futureRes = strings(firstAndRest) flatMap (_.run)

    futureRes.map(x => Ok(x.toString)) // prints (1,234)
  } }

}
于 2012-06-08T19:56:22.313 に答える