1

何百もの一時キューを表示する代わりに、固定応答キューを使用できるようにする RabbitMQ 構成を実装しようとしています。公開された最初のメッセージは、応答キューを介してすぐに応答を取得し、2 番目、3 番目、場合によっては 5 番目のメッセージでも、スタック トレースにReply received after timeout. 少し待ってから別のメッセージを送信すると、すぐに連続するメッセージが同じエラーで再び失敗して、再び応答が返されます。

パブリッシャー側では、次の構成があります。

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
        id="connectionFactory"
        port="${rabbit.port}"
        virtual-host="${rabbit.virtual}"
        host="${rabbit.host}"
        username="${rabbit.username}"
        password="${rabbit.password}"
        connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
        id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-timeout="${rabbit.rpc.timeout}"
        reply-queue="reply">
    <rabbit:reply-listener />
</rabbit:template>

<rabbit:queue id="reply" name="reply" />

コンシューマー側では、次の構成があります。

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
        id="connectionFactory"
        port="${rabbit.port}"
        virtual-host="${rabbit.virtual}"
        host="${rabbit.host}"
        username="${rabbit.username}"
        password="${rabbit.password}"
        connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
        id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-timeout="${rabbit.rpc.timeout}"
        reply-queue="reply">
    <rabbit:reply-listener concurrency="${rabbit.consumers}" />
</rabbit:template>

<!-- Register Queue Listener Beans -->
<rabbit:listener-container
        connection-factory="connectionFactory"
        channel-transacted="true"
        requeue-rejected="true"
        concurrency="${rabbit.consumers}">
    <rabbit:listener queues="test" ref="TestProcessor" method="onMessage" />
</rabbit:listener-container>

<rabbit:queue id="test" name="test" />
<rabbit:queue id="reply" name="reply" />

私は spring-amqp 1.4.4 を使用しています。

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.4.4.RELEASE</version>
    </dependency>

これは、メッセージを作成して公開する方法です。

MessageProperties properties = new MessageProperties();          
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(toJson(request).getBytes(), properties);
Message res = getTemplate().sendAndReceive(exchange, queue, message);

テンプレートは単純に AmqpTemplate の自動配線されたインスタンスです。

@Autowired
AmqpTemplate template;

最初のメッセージはすぐに応答を取得し、2 番目のメッセージ (および 3 番目など) はコンシューマ側で次のスタック トレースを取得します。

2015-04-22 07:53:03,329 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 4bfb2f6f-2e31-414c-9ec3-a4672e4c7e34
2015-04-22 07:53:03,330 [SimpleAsyncTaskExecutor-1] WARN  org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:744)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1276)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
    ... 10 more

... パブリッシャーは、応答キューで応答が得られなかった後にタイムアウトします。

これは、消費者側でメッセージに応答する方法です。

   @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            ...
            System.out.println(message);
            // handle reply-to
            if (message.getMessageProperties() != null && message.getMessageProperties().getReplyTo() != null) {

                Message res = new Message(toJson(response).getBytes(), message.getMessageProperties());
                getTemplate().send("", message.getMessageProperties().getReplyTo(), res);

            }
        } catch (Exception e) {
            e.printStackTrace();
            // TODO: forward to exception queue here
        }    
    }

それSystem.out.println(message);は以下を出力します:

(Body:'{"message":"Sent 'Test Text' on Wed Apr 22 08:17:13 SAST 2015"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[56, 50, 98, 100, 100, 56, 53, 54, 45, 57, 101, 100, 102, 45, 52, 99, 54, 97, 45, 97, 55, 51, 101, 45, 102, 54, 48, 101, 50, 49, 48, 53, 55, 101, 97, 48], replyTo=reply, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0])

何か案は?

4

2 に答える 2

2

2 つのウサギ テンプレートがあり、それぞれが同じreplyキューを使用しています。そのため、「2 番目の」応答はコンシューマ側のテンプレートによって「受信」されています (したがって、ログ メッセージは、待機中の未解決の要求がないときに「応答」を受信したためです)。返信 - プロデューサー側では終わりです)。

rabbitmq 3.4 以降では、通常は新しい rabbit 組み込みの直接返信機能を使用する方がよいことに注意してください。通常、これにより、固定の返信先キュー メカニズムを実装しなければならなかったすべての理由が解決されます。直接返信先のサポートは、Spring AMQP 1.4.1.RELEASE で追加されました。

于 2015-04-22T07:28:55.680 に答える