2

春のバッチリモートパーティショニングを使用しています。以下は私の構成です

<task:executor id="taskExecutor" pool-size="50" />

<rabbit:template id="computeAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
    reply-timeout="${compute.partition.timeout}">
</rabbit:template>

<int:channel id="computeOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="computeInboundStagingChannel" />

<amqp:outbound-gateway request-channel="computeOutboundChannel"
    reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="computeMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="computeOutboundChannel"
    p:receiveTimeout="${compute.partition.timeout}" />


<beans:bean id="computePartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="computeStep" p:gridSize="${compute.grid.size}"
    p:messagingOperations-ref="computeMessagingTemplate" />

<int:aggregator ref="computePartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
    input-channel="computeInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
    request-channel="computeInboundChannel" 
    reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="computeInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />

<int:channel id="computeOutboundStagingChannel" />


<beans:bean id="computePartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
    scope="step" />



<beans:bean id="computeFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
    scope="step" />

<beans:bean id="computeItemWriter"
    class="com.st.batch.foundation.writers.ComputeItemWriter"
    p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
    p:batchId="#{jobParameters[batch_id]}" scope="step" />


<step id="computeStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="computeFileItemReader" writer="computeItemWriter"
            commit-interval="${compute.commit.interval}" />
    </tasklet>
</step>

<flow id="computeFlow">
    <step id="computeStep.master">
        <partition partitioner="computePartitioner"
            handler="computePartitionHandler" />
    </step>
</flow>

<job id="computeJob" restartable="true">
    <flow id="computeJob.computeFlow" parent="computeFlow" />
</job>

問題は、マルチリソース パーティショナーで、パターンを割り当ててファイルを検索し、ファイル数に等しいパーティションを作成することです。ただし、ディレクトリは実行時に条件付きで作成されます。

ディレクトリが存在しない場合(入力ファイルが利用できない場合)、このステップを成功させて、次のステップに進みたいと思います。

現在、ジョブはハングアップするだけで、何もしません。ステップが成功したとは見なされず、例外がスローされるため、失敗することさえありません。このステップではアイドル状態になります。

<beans:bean id="computePartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
    scope="step" />

これを処理する方法はありますか? このステップが成功したと見なして、次のステップに進みたいだけです。

更新:

リモートではなくローカルサーバーで実行されるパーティション分割されたステップを作成してテストしたところ、ファイルが存在しない場合、デフォルトでステップが完了としてマークされます。したがって、問題は MultiResourcePartitioner ではなく、パーティション化されたステップを使用して上記の構成のリモート サーバーで実行すると問題が発生します。

ステップ実行メッセージが送信されていないにもかかわらず、応答を待ち続ける集計ロジックだと思いますか?IntegrationMessageHeaderAccessor.SEQUENCE_SIZE ヘッダー属性に依存するデフォルトの SequenceSizeReleaseStrategy が原因で、メッセージがまったくないため、アグリゲーターは SEQUENCE_SIZE にアクセスできませんか?

@MessageEndpoint
public class MessageChannelPartitionHandler implements PartitionHandler {


    public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
            StepExecution masterStepExecution) throws Exception {

        Set<StepExecution> split = stepExecutionSplitter.split(masterStepExecution, gridSize);
        int count = 0;

        if (replyChannel == null) {
            replyChannel = new QueueChannel();
        }//end if 

        for (StepExecution stepExecution : split) {
            Message<StepExecutionRequest> request = createMessage(count++, split.size(), new StepExecutionRequest(
                    stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), replyChannel);
            if (logger.isDebugEnabled()) {
                logger.debug("Sending request: " + request);
            }
            messagingGateway.send(request);
        }

        Message<Collection<StepExecution>> message = messagingGateway.receive(replyChannel);
        if (logger.isDebugEnabled()) {
            logger.debug("Received replies: " + message);
        }
        Collection<StepExecution> result = message.getPayload();
        return result;

    }

    private Message<StepExecutionRequest> createMessage(int sequenceNumber, int sequenceSize,
            StepExecutionRequest stepExecutionRequest, PollableChannel replyChannel) {
        return MessageBuilder.withPayload(stepExecutionRequest).setSequenceNumber(sequenceNumber)
                .setSequenceSize(sequenceSize)
                .setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())
                .setReplyChannel(replyChannel)
                .build();
    }
}

ステップ実行リクエストがない場合、つまり分割数が 0 の場合、forloop 内に入らないため、メッセージは送信されませんが、for ループの後でも応答の受信を待機します。解決策は何ですか?

4

2 に答える 2

1

ディサイダーを使用してディレクトリをチェックし、CONTINUE または SKIP (または必要なその他の意味のある値) を返します。

于 2014-07-25T13:05:46.693 に答える
0

Spring バッチ バグ トラッカーの Jira ( https://jira.spring.io/browse/BATCH-2283 ) で問題を提起しました。

現在、このシナリオを処理しない MessageChannelPartitionHandler の問題。一時的な修正は、MessageChannelPartitionHandler.handle をオーバーライドし、メッセージ セットのサイズを確認して、上記のチケットで提案されているように NULL を返すことです。

これがメソッドのあり方です。私にとってはうまくいきました。

public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
            StepExecution masterStepExecution) throws Exception {

        Set<StepExecution> split = stepExecutionSplitter.split(masterStepExecution, gridSize);

        if(split.size() == 0) {
            return null;
        }

        int count = 0;

        if (replyChannel == null) {
            replyChannel = new QueueChannel();
        }//end if 

        for (StepExecution stepExecution : split) {
            Message<StepExecutionRequest> request = createMessage(count++, split.size(), new StepExecutionRequest(
                    stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), replyChannel);
            if (logger.isDebugEnabled()) {
                logger.debug("Sending request: " + request);
            }
            messagingGateway.send(request);
        }

        if (logger.isDebugEnabled()) {
            logger.debug("No message sent but waiting for reply: ");
        }

        Message<Collection<StepExecution>> message = messagingGateway.receive(replyChannel);
        if (logger.isDebugEnabled()) {
            logger.debug("Received replies: " + message);
        }
        Collection<StepExecution> result = message.getPayload();
        return result;

    }
于 2014-08-06T17:47:45.720 に答える