5

これに本当に似た何かをする必要があります https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala

私の問題は、不明な数のグループがあり、mapAsync の並列処理の数が取得したグループの数より少なく、最後のシンクでエラーが発生することです。

アップストリーム エラー (akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2) による SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt) の破棄

akkaストリームのパターンガイドhttp://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.htmlで提案されているように、途中にバッファを配置しようとしました

groupBy {
  case LoglevelPattern(level) => level
  case other                  => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
  // write lines of each group to a separate file
  mapAsync(parallelism = 2) {....

しかし、同じ結果で

4

1 に答える 1