1

いくつかの値のハッシュをキーとする JSON オブジェクトのストリームがあります。n 秒 (10? 60?) 間隔でキーごとにカウントし、これらの値を使用してパターン分析を行うことを望んでいます。

私のトポロジ:K->aggregateByKey(n seconds)->process()

ステップで、process - init()私は呼び出さProcessorContent.schedule(60 * 1000L)れることを期待して.punctuate()呼び出しました。ここから、内部ハッシュの値をループして、それに応じて動作します。

値が集計ステップを通過してprocess()関数にヒットするのを見ていますが、.punctuate()呼び出されることはありません。


コード:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);

KStream<String, String> mapped = opxLines.map(new ReMapper());

KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
            new AggregateInit(),
            new OpxAggregate(),
            TimeWindows.of("opx_aggregate", 60000));

ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
                            @Override
                            public Processor<Windowed<String>, String> get() {
                                 return new AggProcessor();
                            }
                       });
    
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

AggregateInit()は null を返します。

単純なタイマーで同等のことができると思います.punctuate()が、このコードが期待どおりに機能しない理由を知りたいです。

4

1 に答える 1