0

この質問に答えて、Odomontois は、すべてをメモリに格納することなく、事前に並べ替えられたストリームをキーでグループ化できる遅延グループ化演算子を実装する方法を示しました。Akka のストリーム (つまり、Source オブジェクト) でこのようなことを行う方法はありますか? あるいは、Akka Source から通常の Stream オブジェクトを取り出して、Odomontois の chopBy を使用できるようにする方法はありますか?

以下は、うまくいかない、完全に失敗した試みです:

  implicit class SourceChopOps[T, NU](s: Source[T, NU]) {
    def chopBy[U](f: T => U) = {
      s.prefixAndTail(1)
        .map(pt => (pt._1.head, pt._2))
        .map {
          case (prefix, tail) =>
            // what to do with pulled off head???
            tail.takeWhile(e => f(e) == f(prefix)) ++ tail.dropWhile(e => f(e) == f(prefix)).chopBy(f) // fails here
        }
      }
    }
  }
4

1 に答える 1