4

akka-streamsession_uid によって無限ストリームからのイベントをグループ化するためのフローを記述し、各セッションのトラフィックの合計を計算したいと考えています(詳細は前の質問で)。

session_uid によるグループ イベントの関数を使用する予定ですSource#groupByが、この関数はすべてのグループ キーを内部に蓄積し、それらを解放する方法がないようです。これによりjava.lang.OutOfMemoryError: Java heap space例外が発生します。これを再現するためのコードは次のとおりです。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}

import scala.util.Random

object GroupByMemoryLeakApplication extends App {
  implicit val system = ActorSystem()
  import system.dispatcher

  implicit val materializer = ActorMaterializer()

  val bigString = Random.nextString(512 * 1024)

  // This is infinite stream of events (i.e. this is session ids)
  val eventsSource = Source(() => (1 to 1000000000).iterator)
    .map((i) => { (i, bigString + i) })

  // This is flow pass event through groupBy function
  val groupByFlow = Flow[(Int, String)]
    .groupBy(_._2)
    .map {
      case (sessionUid, sessionEvents) =>
        sessionEvents
          .map(e => { println(e._1); e })
          .runWith(Sink.head)
    }
    .mapAsync(4)(identity)

  eventsSource
    .via(groupByFlow)
    .runWith(Sink.ignore)
    .onComplete(_ => system.shutdown())
}

では、関連するイベント ストリーム ( ) の処理が完了した後、sessionUid内部でグループ化キー ( ) を解放するにはどうすればよいでしょうか。groupBysessionEvents

session_uid ベースでイベントをグループ化する別の方法を知っている人はいakka-streamますか?

4

0 に答える 0