4

サポート付きでテストSpring-AMQPSpring-Integrationています。次の構成とテストを行いました。

<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />


public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");

    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " \t -> " + msg);

            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

この構成では、メッセージが に到着するとすぐにconsumingChannel確認応答され、キューから削除されることが明らかでした。と checkのsleep後に高い値を付けることで、これを検証しました。それ以上の制御はありません。receivequeue-size

今私が設定した場合、春の統合を介しacknowledge-mode=MANUALて手動で行うように見える方法はありません。ack

メッセージを処理する必要があり、処理後、メッセージがに保持さmanual-ackれるまでそうします。ackdurableQ

MANUALでackを処理する方法はありますspring-amqp-integrationか? 消費者の を制御したいので、に渡すことは避けたいChannelAwareMessageListenerです。inbound-channel-adapterreceive

アップデート:

listener-containerownをinbound-channel-adapter次のように使用する場合、それは可能ではないようです。

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>

プロパティが許可されていないため、上記の構成ではエラーがスローされmessageListenerます。タグのインライン コメントを参照してください。そのため、使用目的が無効になりました(を介しlistner-containerて公開するため)。channelChannelAwareMessageListener

spring-integrationには使用できませんmanual-acknowledgement(私は知っています、これは難しい言い方です!)、誰かがこれを検証するのを手伝ってくれますか、またはこれに必要な特定のアプローチ/構成がありませんか?

4

2 に答える 2

3

問題は、を使用して非同期ハンドオフを使用しているためですQueueChannel。通常は、コンテナー内で同時実行を制御し ( concurrent-consumers="2")、フロー内で非同期ハンドオフを行わない ( を使用するDirectChannel) ことをお勧めします。そうすれば、AUTO ack は問題なく動作します。PollableChannelサブスクライブ anew MessageHandler()からa への受信の代わりにSubscribableChannel

アップデート:

通常、SI アプリケーションでメッセージを処理する必要はありませんが、DirectChannel を使用したテストに相当するものは...

    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);

    channel.subscribe(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });
于 2014-01-10T13:29:00.223 に答える
0

MANUAL Ack は 経由でのみ許可されChannel.basicAck()ます。Channelそのため、メッセージを受信した にアクセスできる必要があります。

advice-chainof で遊んでみてください<int-amqp:inbound-channel-adapter>:

  1. Adviceいくつかを実装するMethodBeforeAdvice
  2. advice-chainオンコンテナが適用されるContainerDelegate#invokeListener
  3. そのメソッドの最初の引数は、まさにChannel
  4. MessageProperties.headersそのChannel中の that に配置できるとしますAdvice
  5. そして、それに設定し<int-amqp:inbound-channel-adapter>ます。mapped-request-headersChannel
  6. そして最後に、ダウンストリーム フローの任意の場所で、Spring Integration Message からbasicAck()そのChannelヘッダーで呼び出しを試みます。
于 2014-01-10T13:20:23.980 に答える