RabbitMQ にメッセージを送信する Java アプリケーションを作成しました。次に、Flume は RabbitMQ キューからメッセージを取得します。Flume を除いて、誰もキューからメッセージをプルしないことに興味があります。
私のアプリケーションは Spring AMQP Java プラグインを使用しています。
問題:
以下のコードでは、メッセージが RabbitMQ キューに送られ、「不明」のままになります。私が理解しているように、RabbitMQ は MessageListener からの ACK を待っていますが、MessageListener は決して ACK しません。誰かがそれを修正する方法を知っていますか?
コード:
public class MyAmqpConfiguration {
@Autowired
ConnectionFactory connectionFactory;
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(activityLogsQueue());
container.setMessageListener(MyMessageListener());
container.setConcurrentConsumers(3);
return container;
}
@Bean(name="myTemplate")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(MyMessageConverter());
return template;
}
}
public class MyMessageListener implements MessageListener {
public MyMessageListener(MessageConverter converter, MyMessageHandler<MyObject> messageHandler) {
this.converter = converter;
this.messageHandler = messageHandler;
}
@Override
public void onMessage(Message message) {
this.messageHandler.doThings();
}
}
public class MyMessageHandler {
@Autowired
@Qualifier("myTemplate")
RabbitTemplate template;
@Override
public void handleMessage(MyObject thing) {
template.convertAndSend(exchange, routingKey, thing);
}
}
public class MyMessageConverter extends JsonMessageConverter {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
//do things
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new UnsupportedOperationException("fromMessage is not supported in "+this.getClass().getName());
}
}