0

バッチ処理に春のバッチリモートパーティショニングを使用しています。春のバッチ管理を使用してジョブを起動しています。

インバウンド ゲートウェイのコンシューマ コンカレンシー ステップを 10 に設定しましたが、並行して実行されるパーティションの最大数は 8 です。

後で消費者の同時実行数を 15 に増やしたいと考えています。

以下は私の構成です、

<task:executor id="taskExecutor" pool-size="50" />

<rabbit:template id="computeAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
    reply-timeout="${compute.partition.timeout}">
</rabbit:template>

<int:channel id="computeOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="computeInboundStagingChannel" />

<amqp:outbound-gateway request-channel="computeOutboundChannel"
    reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="computeMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="computeOutboundChannel"
    p:receiveTimeout="${compute.partition.timeout}" />


<beans:bean id="computePartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="computeStep" p:gridSize="${compute.grid.size}"
    p:messagingOperations-ref="computeMessagingTemplate" />

<int:aggregator ref="computePartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
    input-channel="computeInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
    request-channel="computeInboundChannel" 
    reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="computeInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />

<int:channel id="computeOutboundStagingChannel" />

<beans:bean id="computePartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
    scope="step" />



<beans:bean id="computeFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
    scope="step" />

<beans:bean id="computeItemWriter"
    class="com.st.batch.foundation.writers.ComputeItemWriter"
    p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
    p:batchId="#{jobParameters[batch_id]}" scope="step" />


<step id="computeStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="computeFileItemReader" writer="computeItemWriter"
            commit-interval="${compute.commit.interval}" />
    </tasklet>
</step>

<flow id="computeFlow">
    <step id="computeStep.master">
        <partition partitioner="computePartitioner"
            handler="computePartitionHandler" />
    </step>
</flow>

<job id="computeJob" restartable="true">
    <flow id="computeJob.computeFlow" parent="computeFlow" />
</job>



compute.grid.size = 112
compute.consumer.concurrency = 10

Input files are splited to 112 equal parts = compute.grid.size = total number of partitions

Number of servers = 4.

2つの問題があり、

i) 同時実行数を 10 に設定しましたが、実行中のスレッドの最大数は 8 です。

ii)

他のプロセスが実行されると遅いものもあれば速いものもあるので、ステップの実行が公平に分散されるようにします。つまり、より高速なサーバーが実行を完了した場合、キュー内の残りの実行はそれらに移動する必要があります。ラウンド ロビン方式で配布しないでください。

私はrabbitmqに、運賃を分配するためのプリフェッチカウント設定とackモードがあることを知っています。Spring 統合の場合、プリフェッチ カウントはデフォルトで 1 であり、ack モードはデフォルトで AUTO です。しかし、他のサーバーが長時間稼働しているにもかかわらず、一部のサーバーはさらに多くのパーティションを実行し続けています。アイドル状態のサーバーがないことが理想的です。

アップデート:

私が今観察したもう 1 つのことは、(リモート パーティショニングを使用して分散されていない) 分割を使用して並列に実行されるいくつかのステップでも、最大 8 並列で実行されることです。スレッド プール制限の問題のように見えますが、ご覧のとおり、taskExecutor のプール サイズは 50 に設定されています。

spring-batch/spring-batch-admin に同時実行ステップ数を制限するものはありますか?

2 回目の更新:

また、並列処理アイテムで実行されているスレッドが 8 つ以上ある場合、Spring バッチ管理は読み込まれません。ハングするだけです。同時実行数を減らすと、Spring Batch Admin が読み込まれます。あるサーバーで同時実行数を 4 に設定し、別のサーバーで 8 を設定してテストしたところ、Spring バッチ管理者はそれをロードしません。8 つのスレッドが実行されているサーバーの URL を使用していますが、4 つのスレッドが実行されているサーバーで動作します。

Spring バッチ管理マネージャーには、以下の jobLauncher 構成があり、

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>

<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

プールサイズは6ですが、上記の問題と関係がありますか?

または、実行中のスレッド数を 8 に制限する tomcat 7 には何かありますか?

4

2 に答える 2