1

ファイルをレコードに分割する Mule 3.3.0 フローがあります。すべてのレコードの処理が終了した後に、アクション (ストアド プロシージャ) を実行する必要があります。問題は、すべてのレコードが Mule によって処理される前にアクションが実行されることがあることです。これは、Mule が処理を並行して行うという事実によるものだと思います。これは素晴らしいことですが、最終アクションがあまりにも早く呼び出されることがあります。フローを同期として設定すると、機能しているように見えますが、並列実行を利用していません。Foreachスコープも使用できると思いますが(試していません)、まだ並列化されていないと思います。すべてのレコードの処理が完了するまで「待機」する方法はありますか?

この動作を示す非常に単純なフローを添付します。実行すると、ロガーが順番どおりに出力しないことがわかります。実際には、「DONE」メッセージが残りの前にログに記録されます。フローは、単純な csv ファイルを、値が「end」のフィールドに一致するまで処理します。そのようなフィールドが見つかったときに「DONE」をログに記録する選択コンポーネントがあります。残りのフィールドは単にログに記録されます。

どんな助けでも大歓迎です。

フロー:

ここに画像の説明を入力

フロー XML

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.3.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd 
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd ">

<file:connector name="inputFileConnector" autoDelete="true"
    streaming="false" validateConnections="true" doc:name="File" fileAge="60000"
    readFromDirectory="#{systemProperties['user.home']}" />

<flow name="flow1" doc:name="flow1" processingStrategy="synchronous">
    <file:inbound-endpoint path="#{systemProperties['user.home']}"
        responseTimeout="10000" doc:name="Input File" fileAge="100"
        connector-ref="inputFileConnector">
        <file:filename-regex-filter pattern="input.csv"
            caseSensitive="false" />
    </file:inbound-endpoint>
    <byte-array-to-string-transformer
        doc:name="Byte Array to String" />
    <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[return payload.split('\n');]]></scripting:text>
        </scripting:script>
    </scripting:component>
    <collection-splitter doc:name="Collection Splitter" />
    <choice doc:name="Choice">
        <when expression="#[groovy:payload != 'end']">
            <processor-chain>
                <logger message="." level="INFO" doc:name="Process"/>
                <vm:outbound-endpoint path="toFlow2" doc:name="VM"/>
            </processor-chain>
        </when>
        <otherwise>
            <processor-chain>
                <logger message="|||| DONE" level="INFO" doc:name="DONE"/>
            </processor-chain>
        </otherwise>
    </choice>
</flow>

<flow name="flow2" doc:name="flow2" >
    <vm:inbound-endpoint path="toFlow2" doc:name="VM"/>
    <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[return payload.split(',');]]></scripting:text>
        </scripting:script>
    </scripting:component>
    <collection-splitter doc:name="Collection Splitter" />
    <logger message="|||||| #[payload]" level="INFO" doc:name="Logger"/>
    <vm:outbound-endpoint path="toFlow3" doc:name="VM"/>
</flow>

4

1 に答える 1

1

1つのオプションは、を使用しcollection-aggregatorてアキュムレータとして機能し、すべてのメッセージが処理されるまで最終フローアクションをブロックすることです。秘訣は、collection-splittersが、ファイル内の行数またはファイル内の列数のいずれかにのみ適した相関グループサイズを設定することです。ただし、すべての行のすべての列が処理されるまで累積します。解決策は、最初にこの値(つまり、予想されるメッセージの総数)を計算し、計算された相関グループのサイズを合計値でオーバーライドすることcollection-splitterです。

これが私がこれを行った方法です(すべてのGroovyスニペットをより多くのMule-3風のMEL式に置き換えたことに注意してください):

<file:connector name="inputFileConnector" autoDelete="true"
    streaming="false" validateConnections="true" fileAge="60000"
    readFromDirectory="#{systemProperties['user.home']}" />

<flow name="flow1" processingStrategy="synchronous">
    <file:inbound-endpoint path="#{systemProperties['user.home']}"
        responseTimeout="10000" fileAge="100"
        connector-ref="inputFileConnector">
        <file:filename-regex-filter pattern="input.csv"
            caseSensitive="false" />
    </file:inbound-endpoint>
    <byte-array-to-string-transformer />
    <set-session-variable variableName="expectedMessageCount"
                          value="#[org.mule.util.StringUtils.countMatches(message.payload, '\n') + org.mule.util.StringUtils.countMatches(message.payload, ',') - 1]" />
    <expression-transformer expression="#[message.payload.split('\n')]" />
    <collection-splitter enableCorrelation="IF_NOT_SET" />
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
                  value="#[sessionVars.expectedMessageCount]" />
    <choice>
        <when expression="#[message.payload != 'end']">
            <processor-chain>
                <logger message="." level="INFO" />
                <vm:outbound-endpoint path="toFlow2" />
            </processor-chain>
        </when>
        <otherwise>
            <processor-chain>
                <logger message="|||| END" level="INFO" />
            </processor-chain>
        </otherwise>
    </choice>
</flow>

<flow name="flow2">
    <vm:inbound-endpoint path="toFlow2"/>
    <expression-transformer expression="#[message.payload.split(',')]" />
    <collection-splitter />
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"
                  value="#[sessionVars.expectedMessageCount]" />
    <logger message="|||||| #[message.payload]" level="INFO"/>
    <vm:outbound-endpoint path="toFinalizer" />
    <vm:outbound-endpoint path="toFlow3" />
</flow>

<flow name="finalizer">
    <vm:inbound-endpoint path="toFinalizer" />
    <collection-aggregator />
    <logger message="|||| DONE" level="INFO" />
</flow>

NB。collection-aggregatorまたは、メモリの使用量が多すぎるためにaの使用が問題になる場合は、式コンポーネントを使用してデクリメントsessionVars.expectedMessageCountとフィルタリングを行い、カウンタが0に戻ったときにメッセージが最終的なメッセージプロセッサにヒットするようにすることができます。

于 2012-10-08T18:38:20.607 に答える