10

Spring バッチ 2.2.1 を使用して、Spring バッチ ジョブを構成し、次のアプローチを使用しました。

構成は次のとおりです。

  • Tasklet は、15 スレッドに制限された ThreadPoolTask​​Executor を使用します

  • スロットル制限はスレッド数と同じです

  • チャンクは以下で使用されます:

    • JdbcCursorItemReader の 1 つの同期アダプター。Spring Batch ドキュメントの推奨に従って、多くのスレッドで使用できるようにします。

      read() への呼び出しを同期することができます。処理と書き込みがチャンクの最も高価な部分である限り、ステップはシングルスレッド構成よりもはるかに速く完了する可能性があります。

    • JdbcCursorItemReader で saveState が false

    • JPA に基づくカスタム ItemWriter。1 つのアイテムの処理は処理時間の点で異なる場合があることに注意してください。数ミリ秒から数秒 (> 60 秒) かかる場合があります。

    • commit-interval を 1 に設定 (改善できることはわかっていますが、問題ではありません)

  • Spring Batch doc の推奨事項に関して、すべての jdbc プールは問題ありません

バッチを実行すると、次の理由により、非常に奇妙で悪い結果が生じます。

  • ある段階で、アイテムがライターによって処理されるのに時間がかかる場合、スレッド プール内のほぼすべてのスレッドが処理する代わりに何もせず、遅いライターだけが機能します。

Spring Batch コードを見ると、根本的な原因は次のパッケージにあるようです。

  • org/springframework/バッチ/繰り返し/サポート/

この作業方法は機能ですか、それとも制限/バグですか?

特徴であるとすれば、すべてを書き直さなくても、長い処理作業に飢えずにすべてのスレッドを作成する構成による方法は何ですか?

すべてのアイテムに同じ時間がかかる場合、すべてが正常に機能し、マルチスレッドは問題ありませんが、アイテムの処理の 1 つに時間がかかる場合は、遅いプロセスが機能している間はマルチスレッドはほとんど役に立ちません。

この問題を開いたことに注意してください:

4

4 に答える 4

5

アレックスが言ったように、この動作は次の javadocs による契約のようです:

サブクラスは、次の結果を取得するメソッド * と、すべての結果が同時プロセスまたはスレッドから返されるのを待つメソッド * を提供する必要があります。

見る:

TaskExecutorRepeatTemplate#waitForResults

別のオプションは、 Partitioning を使用することです。

  • Partitionned ItemReader からアイテムを実行する TaskExecutorPartitionHandler。以下を参照してください。
  • ItemReader によって処理される範囲を提供する Partitioner の実装。以下の ColumnRangePartitioner を参照してください。
  • Partitioner が埋めたものを使用してデータを読み取る CustomReader。以下の myItemReader 構成を参照してください。

Michael Minella は、彼の著書Pro Spring Batchの第 11 章でこれについて説明しています。

<batch:job id="batchWithPartition">
    <batch:step id="step1.master">
        <batch:partition  partitioner="myPartitioner" handler="partitionHandler"/>
    </batch:step>       
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size--> 
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="myItemReader"
                writer="manipulatableWriterForTests" commit-interval="1"
                skip-limit="30000">
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception" />
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
</batch:step>
 <!-- scope step is critical here-->
<bean id="myItemReader"    
                        class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql">
        <value>
            <![CDATA[
                select * from customers where id >= ? and id <=  ?
            ]]>
        </value>
    </property>
    <property name="preparedStatementSetter">
        <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
            <property name="parameters">
                <list>
 <!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
                    <value>{stepExecutionContext[minValue]}</value>
                    <value>#{stepExecutionContext[maxValue]}</value>
                </list>
            </property>
        </bean>
    </property>
    <property name="rowMapper" ref="customerRowMapper"/>
</bean>

パーティショナー.java:

 package ...;
  import java.util.HashMap;  
 import java.util.Map;
 import org.springframework.batch.core.partition.support.Partitioner;
 import org.springframework.batch.item.ExecutionContext;
 public class ColumnRangePartitioner  implements Partitioner {
 private String column;
 private String table;
 public Map<String, ExecutionContext> partition(int gridSize) {
    int min =  queryForInt("SELECT MIN(" + column + ") from " + table);
    int max = queryForInt("SELECT MAX(" + column + ") from " + table);
    int targetSize = (max - min) / gridSize;
    System.out.println("Our partition size will be " + targetSize);
    System.out.println("We will have " + gridSize + " partitions");
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
    int number = 0;
    int start = min;
    int end = start + targetSize - 1;
    while (start <= max) {
        ExecutionContext value = new ExecutionContext();
        result.put("partition" + number, value);
        if (end >= max) {
            end = max;
        }
        value.putInt("minValue", start);
        value.putInt("maxValue", end);
        System.out.println("minValue = " + start);
        System.out.println("maxValue = " + end);
        start += targetSize;
        end += targetSize;
        number++;
    }
    System.out.println("We are returning " + result.size() + " partitions");
    return result;
}
public void setColumn(String column) {
    this.column = column;
}
public void setTable(String table) {
    this.table = table;
}
}
于 2013-08-19T17:17:08.077 に答える
3

これが私が起こっていると思うことです:

  • あなたが言ったように、 ThreadPoolTask​​Executor は15スレッドに制限されています
  • フレームワークの「チャンク」により、 JdbcCursorItemReader 内の各項目 (スレッド制限まで) が別のスレッドで実行される
  • ただし、コミット間隔が 1 の場合、Spring Batch フレームワークは、次のチャンクに移動する前に、各スレッド (つまり、15 個すべて) が個々の読み取り/プロセス/書き込みフローを完了するのも待機しています。スレッドは、完了するのに永遠にかかる兄弟スレッドでほぼ 60 秒待機します。

つまり、Spring Batch のこのマルチスレッド アプローチが役立つためには、各スレッドがほぼ同じ時間で処理する必要があります。特定の項目の処理時間に大きな差があるというシナリオでは、スレッドの多くが完了し、長時間実行されている兄弟スレッドが次の処理チャンクに移動できるようになるまで待機しているという制限が発生しています。

私のおすすめ:

  • 一般に、スレッドの 1 つが長時間の書き込みでスタックしている場合でも、コミット間の単一のスレッドで複数のカーソル項目を処理できるようにするため、コミット間隔を長くすると多少は役立つはずです。ただし、運が悪いと、複数の長いトランザクションが同じスレッドで発生し、事態を悪化させる可能性があります (たとえば、コミット間隔 2 の単一スレッドでのコミット間に 120 秒)。
  • 具体的には、最大データベース接続数の 2 倍または 3 倍を超えても、スレッド プールのサイズを大きくすることをお勧めします。スレッド プールのサイズが大きいため、一部のスレッドが接続を取得しようとしてブロックされたとしても、実行時間の長いスレッドが他のスレッドを停止しなくなるため、実際にはスループットが向上します。カーソルから新しいアイテムを取得し、その間にバッチ ジョブの作業を続行します (チャンクの開始時に、保留中のスレッドの数が、使用可能なデータベース接続の数を大幅に超えます。したがって、OS スケジューラは、スレッドをアクティブにするときに少しチャーンします。データベース接続の取得時にブロックされ、スレッドを非アクティブ化する必要があります。
于 2013-08-19T13:50:49.757 に答える
1

私の場合、スロットル制限を設定しないと、Spring Batch のドキュメントに従って tasklet タグで指定されていない場合、スレッドのデフォルト数でもある ItemReader の read() メソッドには 4 つのスレッドしか入りません。

10 または 20 または 100 など、より多くのスレッドを指定すると、ItemReader の read() メソッドには 8 つのスレッドしか入りません。

于 2013-09-21T06:07:39.057 に答える
1

スロットル制限の値に関係なく、8 つのアクティブ スレッドの制限は、Spring Batch ジョブ リポジトリでの競合が原因である可能性があります。チャンクが処理されるたびに、いくつかの情報がジョブ リポジトリに書き込まれます。必要なスレッド数に対応できるようにプール サイズを増やしてください。

于 2016-01-08T14:44:44.913 に答える