私が使うorg.apache.kafka:kafka-streams:0.10.0.1
KStream.Process()
トリガーする(「句読点」) をトリガーしていないように見える時系列ベースのストリームを操作しようとしています。(参考までにこちらをご覧ください)
構成ではKafkaStreams
、このパラメーターを渡しています(特に):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
ここでは、 JSON データからタイムスタンプ情報を抽出EventTimeExtractor
するカスタム タイムスタンプ エクストラクタ ( を実装) を示します。org.apache.kafka.streams.processor.TimestampExtractor
新しいレコードが取り込まれるたびに、これが私のオブジェクト (から派生TimestampExtractor
) を呼び出すことを期待します。問題のストリームは 2 * 10^6 レコード/分です。punctuate()
60 秒に設定しましたが、起動しません。古い値を引っ張って追いつくため、データがこのスパンを非常に頻繁に通過することはわかっています。
実際、まったく呼び出されません。
- これは、KStream レコードにタイムスタンプを設定するための間違ったアプローチですか?
- これは、この構成を宣言する間違った方法ですか?