2

驚いたことに、Iteratees とエラー処理にいくつか問題があります。

問題;

(ネットワークから、InputStream である必要があります)からいくつかのバイトを読み取りInputStream、この InputStream でいくつかのチャンク/グルーミングを実行し (作業分散のために)、これcase class DataBlock(blockNum: Int, data: ByteString)をアクターに送信するための に変換する変換が続きます (Array[Bytes] は に変換されますCompactByteString)。

流れ;

InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors

コード;

class IterateeTest {
  val actor: ActorRef = myDataBlockRxActor(...)
  val is = new IntputStream(fromBytes...)
  val source = Enumerator.fromStream(is)
  val chunker = Traversable.takeUpTo[Array[Byte]](1000)
  val transform:Iteratee[Array[Byte], Int] = Iteratee.fold[Array[Byte],Int](0) {
    (bNum, bytes) => DataBlock(bNum, CompactByteString(bytes)); bNum + 1
  }
  val fut = source &> chunker |>> transform
}

case class DataBlock(blockNum: Int, data: CompactByteString)

質問;

私の現在の Iteratee コードはうまく機能します。ただし、どちらの側でも障害を処理できるようにしたいと考えています。

  1. InputStreamreadメソッドが失敗した場合 - 正常に処理されたバイト数/ブロック数を知り、その時点からストリームの読み取りを再開したい。列挙子を読み込むとエラーがスローされ、fut例外が返されるだけで、状態がないため、rxing アクターに渡さない限り、現在どのブロックにいるのかわかりません (これはやりたくないことです)。
  2. 出力側が失敗した場合、またはアクタのバッファが入力ストリームからの読み取りを完全に保持しているため、DataBlock メッセージを受信できなくなった場合

どうすればいいですか?

定義済みのエラー処理が必要なため、Play の iteratees よりも react-streams/Akka-stream (実験的) または scalaz iteratees を使用してこれを試した方がよいでしょうか?

4

1 に答える 1

2

(1) で実装できますEnumeratee

val (counter, getCount) = {
  var count = 0
  (Enumeratee.map { x => count += 1; x },
   () => count)
}
val fut = source &> counter &> chunker |>> transform

その後、 aなどgetCountの内部で使用できます。recoverfut

(2) は Play Iteratees で無料で入手できます。Iterateeが追加のデータの準備ができるまで、それ以上の読み取りは行われず、失敗した場合はそれ以上の読み取りは行われません。が完了すると、障害または正常終了に関係なく、InputStreamは自動的に閉じられます。Enumerator

于 2014-06-16T21:53:08.730 に答える