Spring Batch JSR-352 実装を使用して単純なバッチを作成しました。バッチは PartitionMapper を使用して、チャンクごとに個別のプロパティを挿入します。JSR-352 仕様 (partitionsOverride = False の場合) によると、パーティション チャンクの 1 つが失敗し、バッチが再開された場合、失敗したパーティション チャンクのみが再開されます。
たとえば、partition0、partition1、および partition2 の 3 つのパーティションがあるとします。パーティション 1 とパーティション 2 が失敗した場合、バッチはパーティション 1 とパーティション 2 のみを独自のバッチ プロパティで再起動する必要があります。
ただし、Spring Batch JSR-352 実装 (最新バージョン 3.0.3.Release) を使用すると、バッチを再起動すると、partition1 と partition2 ではなく partition0 と partition1 が再起動されることに気付きました。したがって、2 つのパーティションで障害が発生したことは正しく検出されますが、障害が発生したパーティションではなく、最初の (2 つの) パーティションが誤って再起動されます。
これは Spring Batch 実装のバグですか、それとも何か不足していますか?
JSR-352ドキュメントのセクション10.8.5を参照してください: http://download.oracle.com/otn-pub/jcp/batch-1_0_revA-mrel-spec/JSR_352-v1.0_Rev_a-Maintenance_Release.pdf
これが私が使用したコードです:
/META-INF/バッチジョブ/sampleBatch.xml
<?xml version="1.0" encoding="UTF-8"?>
<job id="sampleBatch" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd">
<step id="sampleStep">
<chunk item-count="100">
<reader ref="com.springapp.batch.SampleReader">
<properties>
<property name="sample" value="#{partitionPlan['sample']}"/>
</properties>
</reader>
<writer ref="com.springapp.batch.SampleWriter">
<properties>
<property name="sample" value="#{partitionPlan['sample']}"/>
</properties>
</writer>
</chunk>
<partition>
<mapper ref="com.springapp.batch.SamplePartitionMapper"/>
</partition>
</step>
</job>
com.springapp.batch.SamplePartitionMapper:
package com.springapp.batch;
import java.util.Properties;
import javax.batch.api.partition.PartitionMapper;
import javax.batch.api.partition.PartitionPlan;
import javax.batch.api.partition.PartitionPlanImpl;
public class SamplePartitionMapper implements PartitionMapper {
@Override
public PartitionPlan mapPartitions() throws Exception {
final PartitionPlan partitionPlan = new PartitionPlanImpl();
int size = 3;
Properties[] partitionProps = new Properties[size];
for (int i=0; i<size; i++) {
final Properties properties = new Properties();
properties.put("sample", ""+i);
partitionProps[i] = properties;
System.out.println("mapPartitions: " + i);
}
partitionPlan.setThreads(1);
partitionPlan.setPartitions(partitionProps.length);
partitionPlan.setPartitionProperties(partitionProps);
return partitionPlan;
}
}
com.springapp.batch.SampleReader:
public class SampleReader extends AbstractItemReader {
@Inject
@BatchProperty
private String sample;
Iterator<Integer> iter;
@Override
public void open(Serializable checkpoint) throws Exception {
System.out.println("open for reading sample: " + sample);
ArrayList list = new ArrayList<Integer>();
for(int i=0; i<Integer.parseInt(sample); i++) {
list.add(new Integer(i));
}
iter = list.iterator();
}
@Override
public Integer readItem() throws Exception {
if(iter.hasNext())
return iter.next();
else
return null;
}
}
com.springapp.batch.SampleWriter:
public class SampleWriter extends AbstractItemWriter {
@Inject
@BatchProperty
private String sample;
@Override
public void writeItems(List<Object> items) throws Exception {
System.out.println("writeItems sample: " + sample);
if(sample.equals("1")) {
throw new Exception("FAIL PARTITION 1");
}
if(sample.equals("2")) {
throw new Exception("FAIL PARTITION 2");
}
for (Object itemObj : items) {
Integer item = (Integer) itemObj;
System.out.println(item);
}
}
}
TestJob テストランナー:
package com.springapp.batch;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import javax.batch.runtime.JobExecution;
import javax.inject.Inject;
import junit.framework.Assert;
import org.junit.Test;
//@RunWith(SpringJUnit4ClassRunner.class)
//@ContextConfiguration("classpath:spring-config.xml")
public class AppTests {
@Test
public void testJob() throws Exception {
JobOperator jobOperator = BatchRuntime.getJobOperator();
long jobExecution = jobOperator.start("sampleBatch", new Properties());
int attempt = 0;
while (true) {
JobExecution execution = jobOperator.getJobExecution(jobExecution);
if (execution.getEndTime() != null) {
//check status
if( "FAILED".equals(execution.getExitStatus()) && attempt < 3 ) {
attempt++;
System.out.println("Batch failed, trying to restart (attempt " + attempt + ")..");
jobExecution = jobOperator.restart(jobExecution, new Properties());
continue;
}
System.out.println("Batch ended with status: " + execution.getExitStatus());
break;
}
}
Assert.assertEquals("COMPLETED", jobOperator.getJobExecution(jobExecution).getExitStatus());
}
}