何百もの一時キューを表示する代わりに、固定応答キューを使用できるようにする RabbitMQ 構成を実装しようとしています。公開された最初のメッセージは、応答キューを介してすぐに応答を取得し、2 番目、3 番目、場合によっては 5 番目のメッセージでも、スタック トレースにReply received after timeout
. 少し待ってから別のメッセージを送信すると、すぐに連続するメッセージが同じエラーで再び失敗して、再び応答が返されます。
パブリッシャー側では、次の構成があります。
<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
<property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
id="connectionFactory"
port="${rabbit.port}"
virtual-host="${rabbit.virtual}"
host="${rabbit.host}"
username="${rabbit.username}"
password="${rabbit.password}"
connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
id="amqpTemplate"
connection-factory="connectionFactory"
reply-timeout="${rabbit.rpc.timeout}"
reply-queue="reply">
<rabbit:reply-listener />
</rabbit:template>
<rabbit:queue id="reply" name="reply" />
コンシューマー側では、次の構成があります。
<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
<property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
id="connectionFactory"
port="${rabbit.port}"
virtual-host="${rabbit.virtual}"
host="${rabbit.host}"
username="${rabbit.username}"
password="${rabbit.password}"
connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
id="amqpTemplate"
connection-factory="connectionFactory"
reply-timeout="${rabbit.rpc.timeout}"
reply-queue="reply">
<rabbit:reply-listener concurrency="${rabbit.consumers}" />
</rabbit:template>
<!-- Register Queue Listener Beans -->
<rabbit:listener-container
connection-factory="connectionFactory"
channel-transacted="true"
requeue-rejected="true"
concurrency="${rabbit.consumers}">
<rabbit:listener queues="test" ref="TestProcessor" method="onMessage" />
</rabbit:listener-container>
<rabbit:queue id="test" name="test" />
<rabbit:queue id="reply" name="reply" />
私は spring-amqp 1.4.4 を使用しています。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.4.RELEASE</version>
</dependency>
これは、メッセージを作成して公開する方法です。
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(toJson(request).getBytes(), properties);
Message res = getTemplate().sendAndReceive(exchange, queue, message);
テンプレートは単純に AmqpTemplate の自動配線されたインスタンスです。
@Autowired
AmqpTemplate template;
最初のメッセージはすぐに応答を取得し、2 番目のメッセージ (および 3 番目など) はコンシューマ側で次のスタック トレースを取得します。
2015-04-22 07:53:03,329 [SimpleAsyncTaskExecutor-1] WARN org.springframework.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 4bfb2f6f-2e31-414c-9ec3-a4672e4c7e34
2015-04-22 07:53:03,330 [SimpleAsyncTaskExecutor-1] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
... 10 more
... パブリッシャーは、応答キューで応答が得られなかった後にタイムアウトします。
これは、消費者側でメッセージに応答する方法です。
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
...
System.out.println(message);
// handle reply-to
if (message.getMessageProperties() != null && message.getMessageProperties().getReplyTo() != null) {
Message res = new Message(toJson(response).getBytes(), message.getMessageProperties());
getTemplate().send("", message.getMessageProperties().getReplyTo(), res);
}
} catch (Exception e) {
e.printStackTrace();
// TODO: forward to exception queue here
}
}
それSystem.out.println(message);
は以下を出力します:
(Body:'{"message":"Sent 'Test Text' on Wed Apr 22 08:17:13 SAST 2015"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[56, 50, 98, 100, 100, 56, 53, 54, 45, 57, 101, 100, 102, 45, 52, 99, 54, 97, 45, 97, 55, 51, 101, 45, 102, 54, 48, 101, 50, 49, 48, 53, 55, 101, 97, 48], replyTo=reply, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0])
何か案は?