Observable
Monix でキーごとに単一の分割を実行してからn
、すべての最後のイベントにグループ化GrouppedObservable
し、さらに処理するために送信します。問題は、グループ化するキーの数が無限である可能性があり、メモリ リークが発生することです。
アプリケーションのコンテキスト:
多くの会話からのメッセージを含むカフカ ストリームがあります。各会話にはroomId
、この id をグループ化して、それぞれが単一の会話からのメッセージのみを含むオブザーバブルのコレクションを取得したいと考えています。通常、会話ルームは短命です。つまり、一意の で新しい会話が作成され、roomId
短期間に数十のメッセージが交換され、その後会話が閉じられます。メモリ リークを回避するために、最新の会話を 100 ~ 1000 だけバッファに保持し、古い会話は破棄したいと考えています。そのため、イベントが長い間見られなかった会話から発生した場合、以前のメッセージを含むバッファーが忘れられるため、新しい会話として扱われます。
MonixkeysBuffer
の groupBy メソッドには、キー バッファーの処理方法を指定する引数があります。
DropOldストラテジーに指定するkeyBuffer
ことで、私が望んでいた動作を実現できると思いました。
以下は、説明されている使用例の簡略化されたバージョンです。
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
import scala.concurrent.duration._
import scala.util.Random
case class Event(key: Key, value: String, seqNr: Int) {
override def toString: String = s"(k:$key;s:$seqNr)"
}
case class Key(conversationId: Int, messageNr: Int)
object Main {
def main(args: Array[String]): Unit = {
val fakeConsumer = Consumer.foreach(println)
val kafkaSimulator = Observable.interval(1.millisecond)
.map(n => generateHeavyEvent(n.toInt))
val groupedMessages = kafkaSimulator.groupBy(_.key)(OverflowStrategy.DropOld(50))
.mergeMap(slidingWindow)
groupedMessages.consumeWith(fakeConsumer).runSyncUnsafe()
}
def slidingWindow[T](source: Observable[T]): Observable[Seq[T]] =
source.scan(List.empty[T])(fixedSizeList)
def fixedSizeList[T](list: List[T], elem: T): List[T] =
(list :+ elem).takeRight(5)
def generateHeavyEvent(n: Int): Event = {
val conversationId: Int = n / 500
val messageNr: Int = n % 5
val key = Key(conversationId, messageNr)
val value = (1 to 1000).map(_ => Random.nextPrintableChar()).toString()
Event(key, value, n)
}
}
ただし、VisualVM でアプリケーション ヒープを観察すると、メモリ リークが示されます。30分ほど走った後、java.lang.OutOfMemoryError: GC overhead limit exceeded
以下は、アプリを約 30 分間実行したときのヒープ使用量プロットのスクリーンショットです。(最後に平らにした部分が後OutOfMemoryError
)
私の質問は次のとおりです:メモリをリークすることなく、おそらく無限の数のキーで monix のイベントをグループ化するにはどうすればよいですか? 古い鍵はドロップできます
背景情報:
- モニクスのバージョン:
3.0.0-RC2
- スカラバージョン:
2.12.8