1

並列処理を試みている SpringBatch アプリケーションがあります。バッチでは、テーブルから読み取り、応答で別のテーブルを更新します。入力テーブルに 100 レコードある場合、出力テーブルにも 100 レコードが必要です。

現在、入力テーブルには 13600 のレコードがあります。で試してみたところSyncTaskExecutor、実行中のスレッドは 1 つだけで、出力テーブルには 13600 レコードがありました。で試してみたところSimpleAsyncTaskExecutor、出力テーブルには 900 レコードしかありませんでした。

以下のジョブ宣言:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd">

<import resource="applicationContext.xml" />


<bean id="itemReader"
    class="org.springframework.batch.item.database.JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="sql" value="select REP_QMUT_KEY, DLN_DLNRNR, DLN_AFVDAT, F_IND_MEMO_DVB, MUT_MUTDAT_UM, MUT_VERWDAT_UM, MUT_SRT_MUT_UM from REP_QMUT" />
    <property name="rowMapper">
        <bean class="com.aegon.quinto.service.mapper.MutationInputRowMapper" />
    </property>
</bean>
<bean id="simpleStep"
    class="org.springframework.batch.core.step.item.SimpleStepFactoryBean">
    <property name="transactionManager" ref="transactionManager" />
    <property name="jobRepository" ref="jobRepository" />
    <property name="itemReader" ref="itemReader" />
    <property name="itemWriter" ref="itemWriter" />
    <property name="commitInterval" value="10" />
    <property name="startLimit" value="1" />
</bean>

 <bean id="itemWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
    <property name="dataSource" ref="dataSource" />
    <property name="itemSqlParameterSourceProvider">
        <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
    </property>
    <property name="sql" value="INSERT INTO MUT_TRIAL(DLNRNR, AFVDAT,  MEMO_MUTATION, MEMO_PARTICIPANT, MUTATION_DATE, PROCESSING_DATE, RUN_NR, SRT_MUT, REP_QMUT_CORTICON_KEY) VALUES (:dlnrnr,:afvDat,:memo,:participantMemo,:mutationDate,:processDate,:runNr,:mutationType,:mutationKey)" />
    </bean>

<bean id="simpleChunkListner" class="com.aegon.quinto.service.listener.SimpleChunkListener" />

<bean id="taskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />
<bean id="itemProcessor" class="com.aegon.quinto.service.processor.SimpleItemProcessor" />

<!-- job id="simpleJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="simpleStep">
        <tasklet>
            <chunk reader="itemReader"  writer="itemWriter"
                commit-interval="50">
            </chunk>
        </tasklet>
    </step>

</job-->

<job id="simpleJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="simpleStep">
        <tasklet task-executor="taskExecutor" throttle-limit="25">
            <chunk reader="itemReader"  processor="itemProcessor" writer="itemWriter"
                commit-interval="50">
            </chunk>
        </tasklet>
    </step>

</job>


<!--  For running the BatchLauncher -->
<bean id="batchLauncher" class="com.aegon.quinto.service.BatchLauncher">
    <property name="jobLauncher" ref="jobLauncher" />
    <property name="jobRepository" ref="jobRepository" />
    <property name="job" ref="simpleJob" />
</bean>
</beans>

複数のスレッドでステップを実行しようとしています

マッパー:

import java.sql.ResultSet;

import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

com.aegon.quinto.model.MutationInput をインポートします。

public class MutationInputRowMapper は RowMapper を実装します {

public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
    // TODO Auto-generated method stub
    MutationInput mutationInput = new MutationInput();
    mutationInput.setMutationKey(rs.getInt("REP_QMUT_KEY"));
    mutationInput.setDlnrnr(rs.getString("DLN_DLNRNR"));
    mutationInput.setMemo(rs.getString("F_IND_MEMO_MVM"));
    mutationInput.setParticipantMemo(rs.getString("F_IND_MEMO_DVB"));
    mutationInput.setProcessDate(rs.getInt("MUT_VERWDAT_UM"));
    mutationInput.setRunNr(new Integer("2"));
    mutationInput.setMutationType(rs.getString("MUT_SRT_MUT_UM"));

    return mutationInput;
}

}

私の全体的な要件は次のとおりです。入力テーブルからデータを読み取り、外部サービスでデータを検証し、出力テーブルで検証応答を更新します。入力テーブルでは、データはフラットな構造になります。つまり、学生の場合、複数の試験の試験結果が存在する可能性があります。外部サービスにアクセスする前に、その参加者のすべての試験結果を取得する必要があります。外部サービスとの通信は、ネットワーク遅延のためにボトルネックになります。したがって、マルチスレッドが必要です。サンプル実装/ガイドがある場合は、その方法を教えてください。PS: 私は SpringBatch の初心者です。

4

1 に答える 1

1

1 つ以上のコンポーネントがスレッドセーフではないようです

于 2012-07-05T11:18:48.657 に答える