2

私はカフカストリーム2.1を使用しています

メッセージのストリームを ID で集約しようとしています。ほぼ同時に生成された同じ ID を持つ約 20 のメッセージがあります (2 つのメッセージ間の最大数百ミリ秒)。そのため、非アクティブ ギャップが 500 ミリ秒、猶予時間が 5 秒のセッション ウィンドウを使用しています。

入力レコードはキーとして ID を持ち、値はいくつかの文字列フィールド + 0 から数百のエントリを含むことができるマップで構成されます (キーは文字列で、値は 1 つの文字列フィールドを持つオブジェクトです)。

コードは次のとおりです。


private final Duration INACTIVITY_GAP = Duration.ofMillis(500);
private final Duration GRACE_TIME = Duration.ofMillis(5000);

KStream<String, MyCustomMessage> source = streamsBuilder.stream("inputTopic", Consumed.with(Serdes.String(), myCustomSerde));

source
  .groupByKey(Grouped.with(Serdes.String(), myCustomSerde))                
  .windowedBy(SessionWindows.with(INACTIVITY_GAP).grace(GRACE_TIME))
  .aggregate(
     // initializer
     () -> {
         return new CustomAggMessage();
     },
     // aggregates records in same session
     (s, message, aggMessage) -> {
        // ...
         return aggMessage;
     },
     // merge sessions
     (s, aggMessage1, aggMessage2) -> {
         // ... merge
         return aggMessage2;
     },
     Materialized.with(Serdes.String(), myCustomAggSerde)
   )
   .suppress(Suppressed.untilWindowCloses(unbounded()))
   .selectKey((windowed, o) -> windowed.key());
   .toStream().to("outputTopic")

別の Suppressed : も試し.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(30), maxBytes(1_000_000_000L).emitEarlyWhenFull()))ました。役に立ちませんでした。

私はカスタム rocksdb 設定を持っています:

public class CustomRocksDbConfig implements RocksDBConfigSetter {

    @Override
    public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
        BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
        tableConfig.setCacheIndexAndFilterBlocks(true);
        options.setTableFormatConfig(tableConfig);
    }
}

入力トピックには 32 のパーティション (約 10k メッセージ/秒) があり、それぞれ 4 つのストリーム スレッドで 8 つのインスタンスを実行します。

これを実行すると。ヒープの使用率が非常に高く (最大ヒープが 4G に設定され、マシンには 8G が設定されています)、アプリがクラッシュして再起動するため、遅延が増加します。

誰かが理由を知っていますか?これを機能させるために何を変更できますか? セッションウィンドウとそのパラメータはこれを達成する正しい方法ですか?

4

0 に答える 0