以下では、Spring Batch ジョブのコミット間隔が 1000 に設定されているため、書き込みは一度に 1000 ずつデータベースにコミットされていますか? MyBatis SqlSessionFactory は、BATCH 実行として定義されています。
<bean id="sqlSession" class="org.mybatis.spring.SqlSessionTemplate">
<constructor-arg index="0" ref="sqlSessionFactory" />
<constructor-arg index="1" value="BATCH" />
</bean>
<!-- define the SqlSessionFactory -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="typeAliasesPackage" value="org.my.domain" />
</bean>
DEBUG ログで次のことに気付きました。
2015-08-21 22:58:54,632 [main] DEBUG org.mybatis.spring.SqlSessionUtils - Fetched SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@174b870c] from current transaction
上記の DEBUG ステートメントでは、接続を開いていると思います。次に、insert ステートメントの下に 1,000 の 1 つのバッチが挿入されます。または、挿入ごとに MS SQL サーバーへの新しい接続が開かれましたか?
2015-08-21 22:58:54,632 [main] DEBUG org.mybatis.spring.SqlSessionUtils - Fetched SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@174b870c] from current transaction
KickoutMapper.insertKickoutTbl - ==> Parameters: 12143(Long), 10039(Long), 0(Integer), SUBSCRIBER4998(String), .....
2015-08-21 22:58:54,632 [main] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@174b870c]
最後に、トランザクションを閉じています(また、MS SQLサーバーへの接続を閉じているのか、挿入ごとにそれを行っていたのかはわかりません)。ここでも、データベースへの 1 つの接続を使用して、1,000 レコードをバッチで挿入したいと考えています。
2015-08-21 22:58:55,376 [main] DEBUG org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@174b870c]
また、代わりに AsyncItemWriter を JdbcBatchItemWriter に委任できますか?設定した MyBatis よりも高速でしょうか? 以下のコードでは、MyBatis マッパーを使用して 2 つの個別のテーブルに書き込む CompositeItemWriter にデリゲートします。
私の使用例は次のとおりです。mec_mdw データベース テーブルから 700 万件のレコードを読み取って検証する必要があります。
プロセッサによって無効と見なされる無効な mec_mdw レコードは、mec_kickout テーブルに挿入されます。また、その特定のレコードの検証失敗の理由が mec_kickout_reason テーブルに挿入されます。最初に読み取った mec_mdw テーブルにも、処理の最後にいくつかの列が更新されます。
これは私がこれまでに非同期処理と書き込みを使用して行ったことです。現在、読み取られた mdw レコードを 2,500 のみでテストしています。このテストでは、2,500 レコードすべてが意図的に無効であるため、mec_kickout テーブルに挿入されています。 mdw テーブル 2,500 行も更新されています。これはすべて、16 Gb の RAM を搭載した 8 コア CPU のラップトップで約 50 秒で実行され、MS SQL サーバー データベースはネットワーク コールで接続できます。しかし、それがより速くできるかどうかはまだ完全にはわかりません。
<job id="mecmdwvalidatorJob" xmlns="http://www.springframework.org/schema/batch">
<step id="mdwvalidatorStep1">
<tasklet>
<chunk reader="pageItemReader" processor="asyncItemProcessor"
writer="asynchItemWriter" commit-interval="1000" skip-limit="2147483647">
<skippable-exception-classes> <!-- TODO -->
<include class="java.lang.Exception" />
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
</job>
<bean id="pageItemReader"
class="org.springframework.batch.item.database.JdbcPagingItemReader">
<property name="dataSource" ref="dataSource" />
<property name="queryProvider">
<bean
class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="selectClause"
value="select MDW_ID,FK_LOG_FILE_ID,TAX_YEAR,SUBS_TYPE_CD,SUB_FIRST_NM,SUB_MIDDLE_NM,SUB_LAST_NM,SUB_SUFFIX,SUB_DOB,SUB_ADDR1,SUB_ADDR2,SUB_CITY,SUB_STATE,SUB_PROVINCE,SUB_ZIP,SUB_ZIP4,SUB_COUNTRY_CD,SUB_COUNTRY,SUB_F_POSTAL_CD,LOB,SUB_SSN,GRP_EMP_NAME1,GRP_EMP_NAME2,GRP_EIN,GRP_ADDR1,GRP_ADDR2,GRP_CITY,GRP_STATE,GRP_PROVINCE,GRP_ZIP,GRP_ZIP4,GRP_COUNTRY_CD,GRP_COUNTRY,GRP_F_POSTAL_CD,ISSUER_NAME1,ISSUER_NAME2,ISSUER_PHONE,ISSUER_ADDR1,ISSUER_ADDR2,ISSUER_CITY,ISSUER_PROVINCE,ISSUER_ZIP,ISSUER_ZIP4,ISSUER_COUNTRY_CD,ISSUER_COUNTRY,ISSUER_F_POSTAL_CD,MEM_FIRST_NM,MEM_MIDDLE_NM,MEM_LAST_NM,MEM_SUFFIX,MEM_SSN,MEM_DOB,MEM_START_DATE,MEM_END_DATE,REGION_CD,SUB_MRN,SUB_MRN_PREFIX,MEM_MRN,MRN_PREFIX,PID,SUB_GRP_ID,SUB_GRP_NAME,INVALID_ADDR_FL" />
<property name="fromClause"
value="from MEC_MDW JOIN MEC_FILE_LOG on MEC_FILE_LOG.LOG_FILE_ID=MEC_MDW.FK_LOG_FILE_ID " />
<property name="whereClause" value="where MEC_FILE_LOG.STATUS=:status" />
<property name="sortKey" value="MDW_ID" />
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="READY TO VALIDATE" />
</map>
</property>
<property name="pageSize" value="1000" />
<property name="rowMapper" ref="mdwRowMapper" />
</bean>
<bean id="mdwRowMapper" class="org.my.rowmapper.MdwRowMapper" />
<bean id="asyncItemProcessor"
class="org.springframework.batch.integration.async.AsyncItemProcessor">
<property name="delegate">
<bean
class="org.my.itemprocessor.MdwValidatingItemProcessor">
<property name="validator">
<bean
class="org.springframework.validation.beanvalidation.LocalValidatorFactoryBean" />
</property>
</bean>
</property>
<property name="taskExecutor" ref="taskExecutor" />
<!-- <property name="taskExecutor"> -->
<!-- <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"
/> -->
<!-- </property> -->
</bean>
<task:executor id="taskExecutor" pool-size="10" />
<bean id="asynchItemWriter"
class="org.springframework.batch.integration.async.AsyncItemWriter">
<property name="delegate" ref="customerCompositeWriter">
</property>
</bean>
<bean id="customerCompositeWriter"
class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="itemWriter1" />
<ref bean="itemWriter2" />
</list>
</property>
</bean>
<bean id="itemWriter1" class="org.my.writer.MdwWriter" />
<bean id="itemWriter2" class="org.my.writer.KickoutWriter" />
</beans>
プロセッサにはビジネス ロジックが含まれますが、現時点では、MecMdw ドメイン オブジェクトで null プロパティを検索する Bean 検証のみが含まれます。最終的には、他のテーブルでアカウント ID を検索するためのアダプター コードも必要になります (データベース接続の喜びが増します!)。このアダプターの DAO ロジックはプロセッサに組み込まれると思います
public class MdwValidatingItemProcessor implements ItemProcessor<MecMdw, MecMdw> {
private Validator validator;
public void setValidator(Validator validator) {
this.validator = validator;
}
public MecMdw process(MecMdw item) throws Exception {
BindingResult results = BindAndValidate(item);
if (results.hasErrors()) {
item.setKick_out_fl('Y');
buildValidationException(results,item);
return item;
}
return item;
}
private BindingResult BindAndValidate(MecMdw item) {
DataBinder binder = new DataBinder(item);
binder.setValidator(validator);
binder.validate();
return binder.getBindingResult();
}
private void buildValidationException(BindingResult results, MecMdw item) {
List<String> listOfErrors = new ArrayList<String>();
for (ObjectError error : results.getAllErrors()) {
listOfErrors.add(error.toString());
}
item.setValidationErrors(listOfErrors);
}
MdwWriter と KickoutWriter は、MyBatis DAO を使用してデータベースに書き込みます。
public class MdwWriter<MecMdw> implements ItemWriter<MecMdw> {
@Autowired
MdwMapper mdwMapper;
@Override
public void write(List<? extends MecMdw> items) throws Exception {
for(MecMdw item : items){
mdwMapper.setMecMdwRecordAsKickOut((org.my.domain.MecMdw) item);
}
}
ここに KickoutWriter.java があります
public class KickoutWriter<MecMdw> implements ItemWriter<MecMdw> {
@Autowired
KickoutMapper kickoutMapper;
@Override
public void write(List<? extends MecMdw> items) throws Exception {
for(MecMdw item : items){
kickoutMapper.insertKickoutTbl((org.my.domain.MecMdw) item);
}
}