同期の ItemProcessor と Writer を使用していましたが、次のコードのように非同期に移動しました。
@Bean
public Job importFraudCodeJob(Step computeFormFileToDB) {
return jobBuilderFactory.get("Import-Entities-Risk-Codes")
.incrementer(new RunIdIncrementer())
.listener(notificationExecutionListener)
.start(computeFormFileToDB)
.build();
}
@Bean
public Step computeFormFileToDB(ItemReader<EntityRiskCodesDto> entityRiskCodeFileReader) {
return stepBuilderFactory.get("ImportFraudCodesStep")
.<EntityFraudCodesDto, Future<EntityFraudCodes>>chunk(chunkSize)
.reader(entityRiskCodeFileReader)
.processor(asyncProcessor())
.writer(asyncWriter())
.faultTolerant()
.skipPolicy(customSkipPolicy)
.listener(customStepListener)
.listener(chunkCounterListener())
.taskExecutor(taskExecutor())
.throttleLimit(6)
.build();
}
私の ItemPocessor<I,O> では、@BeforeStep を使用して、StepExecutionContext に保存した値を取得します。
@BeforeStep
public void getKey(StepExecution stepExecution) {
log.info("Fetching batchNumber");
ExecutionContext context = stepExecution.getExecutionContext();
this.sequenceNumber = (Integer) context.get("sequenceNumber");
}
そして、ここで私の AsyncProcessor の宣言:
@Bean
public AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes> asyncProcessor() {
var asyncItemProcessor = new AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes>();
asyncItemProcessor.setDelegate(riskCodeItemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
問題は、上記のメソッドが呼び出されていないことです。StepExecution から値を取得してAsynchronous ItemProcessor または AsyncItemWiter に渡すにはどうすればよいですか?