これに本当に似た何かをする必要があります https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala
私の問題は、不明な数のグループがあり、mapAsync の並列処理の数が取得したグループの数より少なく、最後のシンクでエラーが発生することです。
アップストリーム エラー (akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2) による SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) の破棄
akkaストリームのパターンガイドhttp://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.htmlで提案されているように、途中にバッファを配置しようとしました
groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
しかし、同じ結果で