4

Spring Integration を使用して以下を構成しようとしています。

  1. チャネルにメッセージを送信します。
  2. このメッセージを、n 個のコンシューマーとのラビット ファンアウト (pub/sub) 交換にパブリッシュします。
  3. 各コンシューマーは応答メッセージを提供します。
  4. 元のクライアントに返す前に、Spring Integration にこれらのレスポンスを集約させます。

これまでのところ、いくつかの問題があります...

  1. 私はapply-sequence="true"、correlationId、sequenceSize、および sequenceNumber プロパティが設定されるようにプロパティを設定するために、publish-subscribe-channel を使用しています。これらのプロパティは、によって破棄されていDefaultAmqpHeaderMapperます。DEBUG headerName=[correlationId] WILL NOT be mapped

  2. ファンアウト交換内に 2 つのキューが登録されているにもかかわらず、sequenceSize プロパティは 1 にしか設定されていません。おそらくこれは、メッセージがアグリゲータからリリースされるのが早すぎることを意味します。これは、使用するために publish-subscribe-channel を誤用しているためでapply-sequence="true"あり、サブスクライバーがint-amqp:outbound-gateway.

私の送信Spring構成は次のとおりです。

<int:publish-subscribe-channel id="output" apply-sequence="true"/>

<int:channel id="reply">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:aggregator input-channel="reply" method="combine">
    <bean class="example.SimpleAggregator"/>
</int:aggregator>

<int:logging-channel-adapter id="logger" level="INFO"/>

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                   reply-channel="reply"/>

私の rabbitMQ 設定は次のとおりです。

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>

<rabbit:fanout-exchange name="fanout-exchange">
    <rabbit:bindings>
        <rabbit:binding queue="a-queue" />
        <rabbit:binding queue="b-queue" />
    </rabbit:bindings>
</rabbit:fanout-exchange>

コンシューマーは次のようになります。

<int:channel id="input"/>

<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>

<bean id="listenerService" class="example.ListenerService"/>

<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>

どんな提案でも素晴らしいでしょう、私はスティックのどこかで間違った端を持っていると思います...

Gary のコメントに基づく新しいアウトバウンド スプリング構成:

<int:channel id="output"/>

<int:header-enricher input-channel="output" output-channel="output">
    <int:correlation-id expression="headers['id']" />
</int:header-enricher>

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                   reply-channel="reply"
                                   mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>

<int:channel id="reply"/>

<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
    <bean class="example.SimpleAggregator"/>
</int:aggregator>
4

2 に答える 2

3

問題は、SIがファンアウト交換のトポロジーについて知らないことです。

これを回避する最も簡単な方法は、カスタムリリース戦略を使用することです

release-strategy-expression="size() == 2"

アグリゲーター上(ファンアウトが2であると想定)。したがって、シーケンスサイズは必要ありません。ヘッダーエンリッチャーを使用して、pub/subチャネルを「乱用」することを回避できます...

    <int:header-enricher input-channel="foo" output-channel="bar">
        <int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" />
    </int:header-enricher>

すでに一意であるメッセージIDを使用することで、新しいUUIDの作成を回避できます...

<int:correlation-id expression="headers['id']" />

最後に、correlationIdヘッダーをAMQPに渡すには、次を追加します。

mapped-request-headers="correlationId"

amqpエンドポイントに。

于 2012-10-29T19:24:43.000 に答える