0

春の統合を使用して、 JMS に裏打ちされたイベント駆動型の方法で Gateway --> Splitter --> ServiceActivator --> Aggregator Pattern を実行しようとしています。サービス アクティベーターがマルチスレッド化され、任意のエンド ポイントがクラスター上で実行できることを期待していますが、元のサーバーである必要はありません。JMS を使用せずに (SI チャネルを使用して) 単一の JVM でこれを機能させることはできますが、SI チャネルは水平方向のスケーリング、つまり複数の VM のスケーリングには役立たないことを理解しています。

これが私がこれまでに持っている構成です

    <int:gateway id="transactionGateway" default-reply-channel="transaction-reply"
    default-request-channel="transaction-request" default-reply-timeout="10000"
    service-interface="com.test.abc.integration.service.ProcessGateway">
</int:gateway>

<int-jms:outbound-gateway id="transactionJMSGateway"
    correlation-key="JMSCorrelationID" request-channel="transaction-request"
    request-destination="transactionInputQueue" reply-channel="transaction-reply"
    reply-destination="transactionOutputQueue" extract-reply-payload="true"
    extract-request-payload="true">
    <int-jms:reply-listener
        max-concurrent-consumers="20" receive-timeout="5000"
        max-messages-per-task="1" />
</int-jms:outbound-gateway>

<!-- Inbound Gateway for Splitter -->
<int-jms:inbound-gateway id="splitterGateWay"
    request-destination="transactionInputQueue" request-channel="splitter-input"
    reply-channel="splitter-output" concurrent-consumers="1"
    default-reply-destination="processInputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    correlation-key="JMSCorrelationID" extract-request-payload="true" />

<!-- Inbound Gateway Invokes Service Activator and Sends response back to 
    the channel -->
<int-jms:inbound-gateway id="seriveActivatorGateway"
    request-destination="processInputQueue" request-channel="process-input"
    reply-channel="process-output" concurrent-consumers="1"
    default-reply-destination="processOutputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    correlation-key="JMSCorrelationID" extract-request-payload="true"
    max-messages-per-task="1" />

<int-jms:inbound-gateway id="aggregatorGateway"
    request-destination="processOutputQueue" request-channel="aggregator-input"
    reply-channel="aggregator-output" concurrent-consumers="1"
    default-reply-destination="transactionOutputQueue"
    max-concurrent-consumers="1" extract-reply-payload="true"
    extract-request-payload="true" max-messages-per-task="1"
    correlation-key="JMSCorrelationID" />


<int:splitter id="transactionSplitter" input-channel="splitter-input"
    ref="processSplitter" output-channel="splitter-output">
</int:splitter>

<int:service-activator id="jbpmServiceActivator"
    input-channel="process-input" ref="jbpmService" requires-reply="true"
    output-channel="process-output">
</int:service-activator>

<int:aggregator id="transactionAggregator"
    input-channel="aggregator-input" method="aggregate" ref="processAggregator"
    output-channel="aggregator-output" message-store="processResultMessageStore"
    send-partial-result-on-expiry="false">
</int:aggregator>

ゲートウェイを使用する前に、JMS を使用したチャネルを使用してみましたが、そのアプローチも成功しませんでした。私が今直面している問題は、スプリッターが に返信するようになったこと transactionOutputQueueです。jms:header-enricher で遊んでみましたが、あまり成功しませんでした。問題 /SI に対する私のアプローチには、根本的な欠陥があるのではないかと感じています。ヘルプ/ガイダンスは大歓迎です。

また、上記で提供したコード スニペットでは、シンプルなイン メモリ アグリゲーターを使用しています。これをクラスター全体で機能させる必要がある場合は、JDBC でサポートされたアグリゲーターが必要になる可能性があることを理解していますが、今のところ、このパターンを取得しようとしています。単一の VM での作業

Gary のコメントに基づいて更新された作業構成は次のとおりです。

<bean id="processOutputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.process.output" />
</bean>

<bean id="transactionOutputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.transaction.result" />
</bean>

<bean id="transactionInputQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="test.com.abc.transaction.input" />
</bean>

<int:gateway id="transactionGateway"
    default-request-channel="transaction-request" default-reply-timeout="10000"
    default-reply-channel="aggregator-output"
    service-interface="com.test.abc.integration.service.ProcessGateway">
</int:gateway>

<int:splitter id="transactionSplitter" input-channel="transaction-request"
    ref="processSplitter" output-channel="splitter-output">
</int:splitter>


<int-jms:outbound-gateway id="splitterJMSGateway"
    correlation-key="JMSCorrelationID" request-channel="splitter-output"
    request-destination="processInputQueue" reply-channel="aggregator-input"
    reply-destination="processOutputQueue" extract-request-payload="true"
    extract-reply-payload="true">
    <int-jms:reply-listener
        max-concurrent-consumers="20" receive-timeout="5000" />
</int-jms:outbound-gateway>

<!-- Inbound Gateway Invokes Service Activator and Sends response back to 
    the channel -->
<int-jms:inbound-gateway id="seriveActivatorGateway"
    request-destination="processInputQueue" request-channel="process-input"
    reply-channel="process-output" default-reply-destination="processOutputQueue"
    concurrent-consumers="5" max-concurrent-consumers="10"
    extract-reply-payload="true" correlation-key="JMSCorrelationID"
    extract-request-payload="true" max-messages-per-task="1" />

<int:service-activator id="jbpmServiceActivator"
    input-channel="process-input" ref="jbpmService" requires-reply="true"
    output-channel="process-output">
</int:service-activator>


<int:aggregator id="transactionAggregator"
    input-channel="aggregator-input" ref="processAggregator"
    output-channel="aggregator-output" message-store="processResultMessageStore"
    send-partial-result-on-expiry="false">
</int:aggregator>

<bean id="processResultMessageStore"
    class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="processResultMessageStoreReaper"
    class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="processResultMessageStore" />
    <property name="timeout" value="5000" />
</bean>
<task:scheduled-tasks>
    <task:scheduled ref="processResultMessageStoreReaper"
        method="run" fixed-rate="1000" />
</task:scheduled-tasks>

<int:logging-channel-adapter id="logger"
    level="DEBUG" log-full-message="true" />

<int-stream:stdout-channel-adapter
    id="stdoutAdapter" channel="logger" />

JMS パイプラインを Service Activator のみに制限しました。これは、私が当初望んでいたものです。

上記のアプローチに基づいて私が持っている唯一の質問は、これを複数の VMS で使用する場合でも、アグリゲーターをデータベースでサポートする必要があるかということです (その前にある JMS ゲートウェイは、有効な相関 ID ?)

よろしく ,

4

1 に答える 1

2

おそらく、すべてのコンポーネント間で JMS を使用する必要はありません。ただし、このようなチェーン ゲートウェイのテスト ケースは多数あり、すべて正常に動作します。

何かが間違って配線されている必要があります。完全な構成を示していないため、推測するのは困難です。

必ず最新バージョン (2.2.4) を使用し、DEBUG ロギングをオンにして、フローのメッセージに従ってください。メッセージ ペイロードが JMS の境界を越えて識別可能である限り、どこで問題が発生したかを簡単に把握できるはずです。

于 2013-06-18T07:16:41.017 に答える