1

KStream からの出力のウィンドウ化されたバッチをグループ化し、それらをセカンダリ ストアに書き込むことを望んでいます。

.punctuate()おおよそ30秒ごとに呼び出されることを期待していました。代わりに得たものはここに保存されます。

(元のファイルは数千行の長さでした)

要約 -.punctuate()一見ランダムに、そして繰り返し呼び出されています。ProcessorContext.schedule()を介して設定された値に準拠していないようです。


編集:

同じコードをもう一度実行すると、.punctuate()約 4 分ごとに への呼び出しが生成されました。今回はクレイジーな繰り返し値は見られませんでした。ソースに変更はありません。結果が異なるだけです。

次のコードを使用します。

主要

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

プロセッサ

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}

プロセッサーサプライヤー

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}

編集:

私のデバッガーがこれを遅くしていないことを確認するために、私はそれをビルドし、私のkafkaプロセスと同じボックスで実行しました. 今回は、4 分以上遅れることさえありませんでした。数秒以内に、偽の呼び出しを出力していました.punctuate()。これらの多く (ほとんど) には、 への呼び出しが介在していません.process()

4

3 に答える 3