アレックスが言ったように、この動作は次の javadocs による契約のようです:
サブクラスは、次の結果を取得するメソッド * と、すべての結果が同時プロセスまたはスレッドから返されるのを待つメソッド * を提供する必要があります。
見る:
TaskExecutorRepeatTemplate#waitForResults
別のオプションは、 Partitioning を使用することです。
- Partitionned ItemReader からアイテムを実行する TaskExecutorPartitionHandler。以下を参照してください。
- ItemReader によって処理される範囲を提供する Partitioner の実装。以下の ColumnRangePartitioner を参照してください。
- Partitioner が埋めたものを使用してデータを読み取る CustomReader。以下の myItemReader 構成を参照してください。
Michael Minella は、彼の著書Pro Spring Batchの第 11 章でこれについて説明しています。
<batch:job id="batchWithPartition">
<batch:step id="step1.master">
<batch:partition partitioner="myPartitioner" handler="partitionHandler"/>
</batch:step>
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size-->
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="myItemReader"
writer="manipulatableWriterForTests" commit-interval="1"
skip-limit="30000">
<batch:skippable-exception-classes>
<batch:include class="java.lang.Exception" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
<!-- scope step is critical here-->
<bean id="myItemReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="dataSource"/>
<property name="sql">
<value>
<![CDATA[
select * from customers where id >= ? and id <= ?
]]>
</value>
</property>
<property name="preparedStatementSetter">
<bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
<value>{stepExecutionContext[minValue]}</value>
<value>#{stepExecutionContext[maxValue]}</value>
</list>
</property>
</bean>
</property>
<property name="rowMapper" ref="customerRowMapper"/>
</bean>
パーティショナー.java:
package ...;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class ColumnRangePartitioner implements Partitioner {
private String column;
private String table;
public Map<String, ExecutionContext> partition(int gridSize) {
int min = queryForInt("SELECT MIN(" + column + ") from " + table);
int max = queryForInt("SELECT MAX(" + column + ") from " + table);
int targetSize = (max - min) / gridSize;
System.out.println("Our partition size will be " + targetSize);
System.out.println("We will have " + gridSize + " partitions");
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
System.out.println("minValue = " + start);
System.out.println("maxValue = " + end);
start += targetSize;
end += targetSize;
number++;
}
System.out.println("We are returning " + result.size() + " partitions");
return result;
}
public void setColumn(String column) {
this.column = column;
}
public void setTable(String table) {
this.table = table;
}
}