0

以下は、etl.xml のジョブの構成です。

<batch:job id="procuerJob">

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />

                    
                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

</batch:ジョブ>

以下は、トピックにメッセージを送信するために使用されるコードです。

ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(メッセージ);

listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {

@Override
public void onSuccess(SendResult<String, message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}

}

kafkaTemplate.send(message) が実行されると、リスナーが呼び出され、ジョブが完了します。ジョブが完了した後に onSuccess()、onFailure() が呼び出されるのがわかります。kafkaトピックから確認を受け取った後にリスナーが呼び出されるように、ジョブの構成を変更するにはどうすればよいですか?

4

1 に答える 1