いくつかの値のハッシュをキーとする JSON オブジェクトのストリームがあります。n 秒 (10? 60?) 間隔でキーごとにカウントし、これらの値を使用してパターン分析を行うことを望んでいます。
私のトポロジ:K->aggregateByKey(n seconds)->process()
ステップで、process - init()
私は呼び出さProcessorContent.schedule(60 * 1000L)
れることを期待して.punctuate()
呼び出しました。ここから、内部ハッシュの値をループして、それに応じて動作します。
値が集計ステップを通過してprocess()
関数にヒットするのを見ていますが、.punctuate()
呼び出されることはありません。
コード:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);
KStream<String, String> mapped = opxLines.map(new ReMapper());
KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
new AggregateInit(),
new OpxAggregate(),
TimeWindows.of("opx_aggregate", 60000));
ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
@Override
public Processor<Windowed<String>, String> get() {
return new AggProcessor();
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
AggregateInit()は null を返します。
単純なタイマーで同等のことができると思います.punctuate()
が、このコードが期待どおりに機能しない理由を知りたいです。