以下は、etl.xml のジョブの構成です。
<batch:job id="procuerJob">
<batch:step id="Produce">
<batch:partition partitioner="partitioner">
<batch:handler grid-size="${ partitioner.limit}"></batch:handler>
<batch:step>
<batch:tasklet>
<batch:chunk reader="Reader" writer="kafkaProducer"
commit-interval="20000">
</batch:chunk>
<batch:listeners>
<batch:listener ref="producingListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:partition>
</batch:step>
</batch:ジョブ>
以下は、トピックにメッセージを送信するために使用されるコードです。
ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(メッセージ);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {
@Override
public void onSuccess(SendResult<String, message > result) {
log.info("marking as SUCCESS");
manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}
@Override
public void onFailure(Throwable ex) {
log.info("marking as FAILURE");
manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}
}
kafkaTemplate.send(message) が実行されると、リスナーが呼び出され、ジョブが完了します。ジョブが完了した後に onSuccess()、onFailure() が呼び出されるのがわかります。kafkaトピックから確認を受け取った後にリスナーが呼び出されるように、ジョブの構成を変更するにはどうすればよいですか?