ウィンドウ集計に Kafka ストリームを使用しています。ロジックは次のとおりです。
KStream<String, String> stream = builder.stream(topics, Consumed.with(stringSerde, stringSerde));
Stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)).grace(Duration.ZERO))
.aggregate(()-> "",
(key, word, aggregate) -> {
logger.info("(key, word, aggregate): ({}, {}, {})" , key, word, aggregate);
aggregate = aggregate+ "-" + word;
return aggregate;
}, Materialized.with(stringSerde, stringSerde))
.toStream((k, v) -> k.key())
.foreach((k, v) -> logger.info("Aggregated to Stream. (k,v): ({}, {})" , k, v));
これはほとんどの場合に機能しますが、次の問題を観察しました。
- 集計が時期尚早にフラッシュされる
- ウィンドウが閉じられる前でも、新しい集約バケットが作成されます
これらの問題は、次のログ (マークされた行) によって明らかです。
[2019-08-14 14:10:38,855] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, a, )
(1)[2019-08-14 14:11:24,503] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -a)
[2019-08-14 14:11:27,735] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, b, -a)
[2019-08-14 14:11:43,298] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, f, -a-b)
[2019-08-14 14:11:59,373] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, b, -a-b-f)
(2)[2019-08-14 14:12:14,196] [ INFO] [prc-client-StreamThread-1] Aggregation (118) - (key, word, aggregate): (1, r, )
[2019-08-14 14:13:24,808] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -a-b-f-b)
[2019-08-14 14:13:24,808] [ INFO] [prc-client-StreamThread-1] Aggregation (124) - Aggregated to Stream. (k,v): (1, -r)
これらの問題に対処する方法はありますか?