Spring Integration を使用して以下を構成しようとしています。
- チャネルにメッセージを送信します。
- このメッセージを、n 個のコンシューマーとのラビット ファンアウト (pub/sub) 交換にパブリッシュします。
- 各コンシューマーは応答メッセージを提供します。
- 元のクライアントに返す前に、Spring Integration にこれらのレスポンスを集約させます。
これまでのところ、いくつかの問題があります...
私は
apply-sequence="true"
、correlationId、sequenceSize、および sequenceNumber プロパティが設定されるようにプロパティを設定するために、publish-subscribe-channel を使用しています。これらのプロパティは、によって破棄されていDefaultAmqpHeaderMapper
ます。DEBUG headerName=[correlationId] WILL NOT be mapped
ファンアウト交換内に 2 つのキューが登録されているにもかかわらず、sequenceSize プロパティは 1 にしか設定されていません。おそらくこれは、メッセージがアグリゲータからリリースされるのが早すぎることを意味します。これは、使用するために publish-subscribe-channel を誤用しているためで
apply-sequence="true"
あり、サブスクライバーがint-amqp:outbound-gateway
.
私の送信Spring構成は次のとおりです。
<int:publish-subscribe-channel id="output" apply-sequence="true"/>
<int:channel id="reply">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:aggregator input-channel="reply" method="combine">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
<int:logging-channel-adapter id="logger" level="INFO"/>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"/>
私の rabbitMQ 設定は次のとおりです。
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>
<rabbit:fanout-exchange name="fanout-exchange">
<rabbit:bindings>
<rabbit:binding queue="a-queue" />
<rabbit:binding queue="b-queue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
コンシューマーは次のようになります。
<int:channel id="input"/>
<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>
<bean id="listenerService" class="example.ListenerService"/>
<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
どんな提案でも素晴らしいでしょう、私はスティックのどこかで間違った端を持っていると思います...
Gary のコメントに基づく新しいアウトバウンド スプリング構成:
<int:channel id="output"/>
<int:header-enricher input-channel="output" output-channel="output">
<int:correlation-id expression="headers['id']" />
</int:header-enricher>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"
mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>
<int:channel id="reply"/>
<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
<bean class="example.SimpleAggregator"/>
</int:aggregator>