驚いたことに、Iteratees とエラー処理にいくつか問題があります。
問題;
(ネットワークから、InputStream である必要があります)からいくつかのバイトを読み取りInputStream
、この InputStream でいくつかのチャンク/グルーミングを実行し (作業分散のために)、これcase class DataBlock(blockNum: Int, data: ByteString)
をアクターに送信するための に変換する変換が続きます (Array[Bytes] は に変換されますCompactByteString)。
流れ;
InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors
コード;
class IterateeTest {
val actor: ActorRef = myDataBlockRxActor(...)
val is = new IntputStream(fromBytes...)
val source = Enumerator.fromStream(is)
val chunker = Traversable.takeUpTo[Array[Byte]](1000)
val transform:Iteratee[Array[Byte], Int] = Iteratee.fold[Array[Byte],Int](0) {
(bNum, bytes) => DataBlock(bNum, CompactByteString(bytes)); bNum + 1
}
val fut = source &> chunker |>> transform
}
case class DataBlock(blockNum: Int, data: CompactByteString)
質問;
私の現在の Iteratee コードはうまく機能します。ただし、どちらの側でも障害を処理できるようにしたいと考えています。
- InputStream
read
メソッドが失敗した場合 - 正常に処理されたバイト数/ブロック数を知り、その時点からストリームの読み取りを再開したい。列挙子を読み込むとエラーがスローされ、fut
例外が返されるだけで、状態がないため、rxing アクターに渡さない限り、現在どのブロックにいるのかわかりません (これはやりたくないことです)。 - 出力側が失敗した場合、またはアクタのバッファが入力ストリームからの読み取りを完全に保持しているため、DataBlock メッセージを受信できなくなった場合
どうすればいいですか?
定義済みのエラー処理が必要なため、Play の iteratees よりも react-streams/Akka-stream (実験的) または scalaz iteratees を使用してこれを試した方がよいでしょうか?