KStream からの出力のウィンドウ化されたバッチをグループ化し、それらをセカンダリ ストアに書き込むことを望んでいます。
.punctuate()
おおよそ30秒ごとに呼び出されることを期待していました。代わりに得たものはここに保存されます。
(元のファイルは数千行の長さでした)
要約 -.punctuate()
一見ランダムに、そして繰り返し呼び出されています。ProcessorContext.schedule()を介して設定された値に準拠していないようです。
編集:
同じコードをもう一度実行すると、.punctuate()
約 4 分ごとに への呼び出しが生成されました。今回はクレイジーな繰り返し値は見られませんでした。ソースに変更はありません。結果が異なるだけです。
次のコードを使用します。
主要
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
プロセッサ
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay;
values = new ArrayList<>();
}
@Override
public void process(String s, String s2) {
LOGGER.debug("batched processor s:{} s2:{}", s, s2);
values.add(s2);
}
@Override
public void init(ProcessorContext context) {
LOGGER.info("init");
super.init(context);
values.clear();
this.context = context;
context.schedule(delay);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size());
context().commit();
}
}
プロセッサーサプライヤー
public class BPS2 implements ProcessorSupplier<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);
@Override
public Processor<String, String> get() {
try {
return new BP2(30000);
} catch(Exception exception) {
LOGGER.error("Unable to instantiate BatchProcessor()", exception);
throw new RuntimeException();
}
}
}
編集:
私のデバッガーがこれを遅くしていないことを確認するために、私はそれをビルドし、私のkafkaプロセスと同じボックスで実行しました. 今回は、4 分以上遅れることさえありませんでした。数秒以内に、偽の呼び出しを出力していました.punctuate()
。これらの多く (ほとんど) には、 への呼び出しが介在していません.process()
。