私はカフカストリーム2.1を使用しています
メッセージのストリームを ID で集約しようとしています。ほぼ同時に生成された同じ ID を持つ約 20 のメッセージがあります (2 つのメッセージ間の最大数百ミリ秒)。そのため、非アクティブ ギャップが 500 ミリ秒、猶予時間が 5 秒のセッション ウィンドウを使用しています。
入力レコードはキーとして ID を持ち、値はいくつかの文字列フィールド + 0 から数百のエントリを含むことができるマップで構成されます (キーは文字列で、値は 1 つの文字列フィールドを持つオブジェクトです)。
コードは次のとおりです。
private final Duration INACTIVITY_GAP = Duration.ofMillis(500);
private final Duration GRACE_TIME = Duration.ofMillis(5000);
KStream<String, MyCustomMessage> source = streamsBuilder.stream("inputTopic", Consumed.with(Serdes.String(), myCustomSerde));
source
.groupByKey(Grouped.with(Serdes.String(), myCustomSerde))
.windowedBy(SessionWindows.with(INACTIVITY_GAP).grace(GRACE_TIME))
.aggregate(
// initializer
() -> {
return new CustomAggMessage();
},
// aggregates records in same session
(s, message, aggMessage) -> {
// ...
return aggMessage;
},
// merge sessions
(s, aggMessage1, aggMessage2) -> {
// ... merge
return aggMessage2;
},
Materialized.with(Serdes.String(), myCustomAggSerde)
)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.selectKey((windowed, o) -> windowed.key());
.toStream().to("outputTopic")
別の Suppressed : も試し.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(30), maxBytes(1_000_000_000L).emitEarlyWhenFull()))ました。役に立ちませんでした。
私はカスタム rocksdb 設定を持っています:
public class CustomRocksDbConfig implements RocksDBConfigSetter {
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
}
}
入力トピックには 32 のパーティション (約 10k メッセージ/秒) があり、それぞれ 4 つのストリーム スレッドで 8 つのインスタンスを実行します。
これを実行すると。ヒープの使用率が非常に高く (最大ヒープが 4G に設定され、マシンには 8G が設定されています)、アプリがクラッシュして再起動するため、遅延が増加します。
誰かが理由を知っていますか?これを機能させるために何を変更できますか? セッションウィンドウとそのパラメータはこれを達成する正しい方法ですか?