3

一連のメッセージを同時に処理したいのですが、VMをrequest-responseに設定しない限り、メッセージをトランザクション化することはできません...その場合、処理は同時ではありません。

Muleのドキュメントには、「Muleトランザクションは同期エンドポイントで構成されている」と記載されていますが、この制限についてはよくわかりません。
トランザクションになりたいフロー内で非同期フローを生成するべきではないことは明らかですが、(非txメインフローから)それぞれが任意の数の非同期フローを開始できない理由は(私には)明確ではありませんトランザクション。

言い換えれば、なぜこれがうまく機能するのですか?

ここに画像の説明を入力してください

しかし、VMを「一方向」に変更すると、次のように失敗します。

org.mule.transaction.IllegalTransactionStateException: Can only bind "javax.sql.DataSource/java.sql.Connection" type resources

これを回避する方法はありますか?

フローのXML:

<?xml version="1.0" encoding="UTF-8"?>
<mule>
    <spring:beans>
        <spring:bean id="dataSource" name="dataSource" class="org.enhydra.jdbc.standard.StandardDataSource" destroy-method="shutdown">
            <spring:property name="driverName" value="org.h2.Driver" />
            <spring:property name="url" value="jdbc:h2:tcp://localhost/~/mule" />
            <spring:property name="user" value="sa" />
            <spring:property name="password">
                <spring:value></spring:value>
            </spring:property>
        </spring:bean>

        <spring:bean id="transactionFactory" name="transactionFactory" class="org.mule.transport.jdbc.JdbcTransactionFactory" />

    </spring:beans>

    <jdbc:connector name="dbConnector" dataSource-ref="dataSource" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" />

    <flow name="triggerFlow" doc:name="triggerFlow">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP" />
        <set-payload value="#[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]" doc:name="[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"/>
        <collection-splitter doc:name="Collection Splitter"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow" />
    </flow>

    <flow name="flow" doc:name="flow">
        <vm:inbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow">
            <custom-transaction factory-ref="transactionFactory" action="ALWAYS_BEGIN" timeout="10" />
        </vm:inbound-endpoint>
        <logger message="#[payload]" level="INFO" doc:name="Logger"/>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 1">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert" value="insert into test values (#[payload], 'Test 1')" />
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 2">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert2" value="insert into test values (#[payload + 10], 'Test 2')" />
        </jdbc:outbound-endpoint>
    </flow>

</mule>

前もって感謝します。

4

3 に答える 3

3

Muleのドキュメントには、「Muleトランザクションは同期エンドポイントで構成されている」と記載されていますが、この制限についてはよくわかりません。

この制限は、SpringのようにMuleで、より一般的にはJavaで、トランザクションがスレッドにバインドされているという事実によるものです。非同期フローでは、複数のスレッドが関係するため、トランザクションとスレッドの関連付けを維持できません。

したがって、メッセージを分割/フォーク/並列化/非同期処理したり、Muleでトランザクションを実行したりすることはできません。

org.mule.transaction.IllegalTransactionStateException:「javax.sql.DataSource/java.sql.Connection」タイプのリソースのみをバインドできます

ただし、これは最初の質問IMOとは関係ありません。これは、を介してcustom-transactionVMエンドポイントをJDBCトランザクションに強制的に登録しようとするためです。これは機能しません。異種リソースを登録する場合は、XAトランザクションを使用します。

編集:コメントで述べたことから、VMエンドポイントをトランザクションに登録したくないので、次のようにJDBCエンドポイントを登録するだけです。

<transactional action="ALWAYS_BEGIN">
    <jdbc:outbound-endpoint exchange-pattern="request-response"
        queryKey="insert" queryTimeout="-1" connector-ref="dbConnector"
        doc:name="insert into test values (1, 'Test 1')">
        <jdbc:transaction action="ALWAYS_JOIN" />
        <jdbc:query key="insert"
            value="insert into test values (1, 'Test 1')" />
    </jdbc:outbound-endpoint>
    <jdbc:outbound-endpoint exchange-pattern="request-response"
        queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector"
        doc:name="insert into test values (2, 'Test 2')">
        <jdbc:transaction action="ALWAYS_JOIN" />
        <jdbc:query key="insert2"
            value="insert into test values (2, 'Test 2')" />
    </jdbc:outbound-endpoint>
</transactional>

これは、one-wayインバウンドエンドポイントで正常に機能します。

于 2013-01-11T01:47:06.723 に答える
1

私はなんとかそれを機能させることができました。sync/txフローを呼び出す中間非同期フローを追加する必要がありました。

ここに画像の説明を入力してください

これは醜くて不必要であり、元の投稿として呼び出すことはまったく問題ないと思いますが、私を超えた理由で、Muleはあなたにこれのためにフープを飛び越えさせます。

フローのXMLは次のとおりです。

<?xml version="1.0" encoding="UTF-8"?>
    <mule>
    <spring:beans>
        <spring:bean id="dataSource" name="dataSource" class="org.enhydra.jdbc.standard.StandardDataSource" destroy-method="shutdown">
            <spring:property name="driverName" value="org.h2.Driver" />
            <spring:property name="url" value="jdbc:h2:tcp://localhost/~/mule" />
            <spring:property name="user" value="sa" />
            <spring:property name="password">
                <spring:value></spring:value>
            </spring:property>
        </spring:bean>

        <spring:bean id="transactionFactory" name="transactionFactory" class="org.mule.transport.jdbc.JdbcTransactionFactory" />

    </spring:beans>

    <jdbc:connector name="dbConnector" dataSource-ref="dataSource" validateConnections="true" queryTimeout="-1" pollingFrequency="0" doc:name="Database" />

    <flow name="triggerFlow" doc:name="triggerFlow">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP" />
        <set-payload value="#[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]" doc:name="[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"/>
        <collection-splitter doc:name="Collection Splitter"/>
        <vm:outbound-endpoint exchange-pattern="one-way" path="async" doc:name="async" />
    </flow>
    <flow name="simpletransactionFlow1" doc:name="simpletransactionFlow1">
        <vm:inbound-endpoint exchange-pattern="one-way" path="async" doc:name="async"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow"/>
    </flow>

    <flow name="flow" doc:name="flow">
        <vm:inbound-endpoint exchange-pattern="request-response" path="flow" doc:name="flow">
            <custom-transaction factory-ref="transactionFactory" action="ALWAYS_BEGIN" timeout="10" />
        </vm:inbound-endpoint>
        <logger message="#[groovy:Thread.currentThread().getName()], payload=#[payload]" level="INFO" doc:name="Logger"/>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 1">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert" value="insert into test values (#[payload], 'Test 1')" />
        </jdbc:outbound-endpoint>
        <jdbc:outbound-endpoint exchange-pattern="request-response" queryKey="insert2" queryTimeout="-1" connector-ref="dbConnector" doc:name="Insert 2">
            <jdbc:transaction action="ALWAYS_JOIN" />
            <jdbc:query key="insert2" value="insert into test values (#[payload + 10], 'Test 2')" />
        </jdbc:outbound-endpoint>
    </flow>

</mule>
于 2013-01-11T17:07:02.547 に答える
0

これについては、Muleのドキュメントで説明されています。おそらく必要になるのはもう少し面倒ですが、それほど悪くはなく、実際に必要なのは2つではなく1つのVMトランスポートだけです。

ここに画像の説明を入力してください

HTTPの代わりにTCPを使いたかったので変更しましたが、基本的には同じ例です。

最初のフローは、トランザクションされていないTCPを受信し、それを使用して何かを実行し(ここでは、バイト配列から文字列までですが、検証などがある可能性があります)、入力をVMコネクタにプッシュしてから、入力をTCPストリームにエコーバックします。VMコネクタの後に戻ると、TCPソースがメッセージを解放できることが保証されます。これは、少し低レベルの保証された配信を実行している場合です。

VMコネクタは一方向であり、永続ストアが接続されているため、メッセージが失われることはありません。これはMuleStudio3.5のエラーを示していますが、正常に動作します。

次に、中間フローが引き継ぎ、トランザクションが含まれるため、ビジネスロジックサブフローが正常に完了しない限り、VMコネクタはメッセージを解放しません。

最後に、サブフローが実行されます。現在、5秒間Thread.sleep()を実行しているだけなので、すべてが機能していることがわかります。Telnetで接続すると、Telnetコンソールに瞬時にエコーバックされ、5秒後に別のエコーが返されます。

お役に立てば幸いです。私はMuleと一緒に数時間しか過ごしていませんが、これは正しいようです。

于 2013-09-23T09:44:40.930 に答える