4

私は、アプリケーションの 1 つのメッセージング インターフェイスを開発中です。アプリケーションは、「ジョブ」を受け入れ、何らかの処理を行い、結果を (実際にはファイルの形式で) 返すように設計されたサービスです。

アイデアは、RabbitMQ をメッセージング インフラストラクチャとして使用し、Spring AMQP を使用してプロトコル固有の詳細を処理することです。

コードから Spring AMQP への密結合を望んでいないため、Spring Integration を使用してメッセージング API を隠したいと考えています。だから基本的に私はこれが欲しい:

RabbitMQ に送信されたメッセージ ====> Spring AMQP ====> Spring Integration ====> MyService ====> RabbitMQ に返信する

これを結び付けるために必要な XML 構成を考え出そうとしていますが、複数レベルの抽象化と異なる用語に問題があります。Spring AMQP/RabbitMQ の上で Spring Integration を実証する実用的な例を見つけることは、この種のセットアップが私にとって非常に「ベスト プラクティス」に感じられるという事実にもかかわらず、驚くほど難しいことが証明されています。

1) それで.. 優秀な魂がこれをざっと見て、おそらく私を正しい方向に押し進めることができますか? 何が必要で何が不要か? :-)

2) 理想的には、キューはマルチスレッド化する必要があります。つまり、taskExecutor は複数のメッセージを並列処理のために jobService に渡す必要があります。どのような構成が必要ですか?

 <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
    http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    ">

    <context:component-scan base-package="com.myprogram.etc" />

    <!-- Messaging infrastructure: RabbitMQ -->

    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="${ei.messaging.amqp.servername}" />
        <property name="username" value="${ei.messaging.amqp.username}" />
        <property name="password" value="${ei.messaging.amqp.password}" />
    </bean>

    <rabbit:connection-factory id="connectionFactory" />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- From RabbitMQ -->

    <int-amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="our-product-name-queue" connection-factory="connectionFactory"/>

    <!-- Spring Integration configuration -->

    <int:channel id="fromAMQP">
        <!-- Is this necessary?? -->
        <int:queue/>
    </int:channel>

    <!-- JobService is a @Service with a @ServiceActivator annotation -->
    <int:service-activator input-channel="fromAMQP" ref="jobService"/>
</beans>
4

1 に答える 1

6

私はあなたと同じくらい春の統合と春の統合-amqpの初心者だと思いますが、1つのサンプルプロジェクトに基づいて何かが機能しました.

rabbitmq インフラストラクチャの場合、次のものがあります。

<rabbit:connection-factory id="rabbitConnectionFactory"/>

<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>

<rabbit:admin connection-factory="rabbitConnectionFactory"/>

<!-- some attributes seemed to be ok with queue name, others required id
  -- so I used both with the same value -->
<rabbit:queue id='test.queue' name='test.queue'/>

<rabbit:direct-exchange name:"my.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="test.queue" key="test.binding"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

rabbitmq にメッセージを送信するには、次のものが必要です。

<!-- This is just an interface definition, no implementation required
  -- spring will generate an implementation which puts a message on the channel -->
<int:gateway id="backgroundService", 
         service-interface="com.company.BackgroundService"
             default-request-channel="toRabbit"

<int:channel id:"toRabbit"/>

<!-- used amqpTemplate to send messages on toRabbit channel to rabbitmq -->
<int-amqp:outbound-channel-adapter channel:"toRabbit" 
                               amqp-template="amqpTemplate" 
                   exchange-name="my.exchange" 
                   routing-key="test.binding"/>

メッセージを受信するには、次のものが必要です。

<int:service-activator input-channel="fromRabbit" 
                       ref="testService" 
                       method="serviceMethod"/>


// from rabbitmq to local channel
<int-amqp:inbound-channel-adapter channel="fromRabbit" 
                                  queue-names="test.queue" 
                                  connection-factory="rabbitConnectionFactory"/>

<int:channel id="fromRabbit"/>

いくつかの注意点 - spring-integration での amqp 統合のドキュメントには、戻り値の同期送受信が可能であると書かれていますが、私はまだそれを理解していません。サービス アクティベーター メソッドが値を返すと、例外がスローされ、メッセージが rabbitmq に戻されます (そして、メッセージを再度受信して例外を再度スローするため、無限ループが生成されます)。

私の BackgroundService インターフェイスは次のようになります。

package com.company

import org.springframework.integration.annotation.Gateway

public interface BackgroundService {

    //@Gateway(requestChannel="someOtherMessageChannel")
    public String sayHello(String toWho)

}

Spring Bean で構成されたデフォルトのチャネルを使用したくない場合は、アノテーションを介してすべてのメソッドでチャネルを指定できます。

service-activator にアタッチされたサービスは次のようになります。

package com.company;

class TestService {

    public void serviceMethod(String param) {
    log.info("serviceMethod received: " + param");
    //return "hello, " + param;
    }
}

rabbitmq を使用せずにローカルですべてを配線したところ、呼び出し元が戻り値を正しく受け取りました。私がrabbitmqチャネルに行ったとき、値を返した後に例外がスローされたときに、前述の無限ループが発生しました。コードを変更せずに別のチャネルに配線することは確かに可能です。解決方法が分かれば回答お願いします。もちろん、必要に応じて、好きなルーティング、変換、フィルタリングをエンドポイント間に配置できます。

上記の XML の抜粋にタイプミスがあっても驚かないでください。Groovy DSL から xml に戻す必要があったため、間違いを犯した可能性があります。しかし、意図は十分に明確である必要があります。

于 2012-04-30T21:09:22.810 に答える