4

ObservableMonix でキーごとに単一の分割を実行してからn、すべての最後のイベントにグループ化GrouppedObservableし、さらに処理するために送信します。問題は、グループ化するキーの数が無限である可能性があり、メモリ リークが発生することです。

アプリケーションのコンテキスト:

多くの会話からのメッセージを含むカフカ ストリームがあります。各会話にはroomId、この id をグループ化して、それぞれが単一の会話からのメッセージのみを含むオブザーバブルのコレクションを取得したいと考えています。通常、会話ルームは短命です。つまり、一意の で新しい会話が作成され、roomId短期間に数十のメッセージが交換され、その後会話が閉じられます。メモリ リークを回避するために、最新の会話を 100 ~ 1000 だけバッファに保持し、古い会話は破棄したいと考えています。そのため、イベントが長い間見られなかった会話から発生した場合、以前のメッセージを含むバッファーが忘れられるため、新しい会話として扱われます。

MonixkeysBufferの groupBy メソッドには、キー バッファーの処理方法を指定する引数があります。

DropOldストラテジーに指定するkeyBufferことで、私が望んでいた動作を実現できると思いました。

以下は、説明されている使用例の簡略化されたバージョンです。

import monix.execution.Scheduler.Implicits.global
import monix.reactive._

import scala.concurrent.duration._
import scala.util.Random

case class Event(key: Key, value: String, seqNr: Int) {
  override def toString: String = s"(k:$key;s:$seqNr)"
}

case class Key(conversationId: Int, messageNr: Int)

object Main {
  def main(args: Array[String]): Unit = {

    val fakeConsumer = Consumer.foreach(println)
    val kafkaSimulator = Observable.interval(1.millisecond)
      .map(n => generateHeavyEvent(n.toInt))

    val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
      .mergeMap(slidingWindow)

    groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
  }

  def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
    source.scan(List.empty[T])(fixedSizeList)

  def fixedSizeList[T](list: List[T], elem: T): List[T] =
    (list :+ elem).takeRight(5)

  def generateHeavyEvent(n: Int): Event = {
    val conversationId: Int = n / 500
    val messageNr: Int = n % 5
    val key = Key(conversationId, messageNr)
    val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
    Event(key, value, n)
  }

}

ただし、VisualVM でアプリケーション ヒープを観察すると、メモリ リークが示されます。30分ほど走った後、java.lang.OutOfMemoryError: GC overhead limit exceeded

以下は、アプリを約 30 分間実行したときのヒープ使用量プロットのスクリーンショットです。(最後に平らにした部分が後OutOfMemoryError)

アプリケーションの VisualVM ヒープ プロット

私の質問は次のとおりです:メモリをリークすることなく、おそらく無限の数のキーで monix のイベントをグループ化するにはどうすればよいですか? 古い鍵はドロップできます

背景情報:

  • モニクスのバージョン:3.0.0-RC2
  • スカラバージョン:2.12.8
4

1 に答える 1