この質問に答えて、Odomontois は、すべてをメモリに格納することなく、事前に並べ替えられたストリームをキーでグループ化できる遅延グループ化演算子を実装する方法を示しました。Akka のストリーム (つまり、Source オブジェクト) でこのようなことを行う方法はありますか? あるいは、Akka Source から通常の Stream オブジェクトを取り出して、Odomontois の chopBy を使用できるようにする方法はありますか?
以下は、うまくいかない、完全に失敗した試みです:
implicit class SourceChopOps[T, NU](s: Source[T, NU]) {
def chopBy[U](f: T => U) = {
s.prefixAndTail(1)
.map(pt => (pt._1.head, pt._2))
.map {
case (prefix, tail) =>
// what to do with pulled off head???
tail.takeWhile(e => f(e) == f(prefix)) ++ tail.dropWhile(e => f(e) == f(prefix)).chopBy(f) // fails here
}
}
}
}