メッセージのプロデューサーはメッセージを永続的に送信していません。MessageListener を介してメッセージを消費しようとすると、例外 (実行時) が発生すると、特定の回数 (デフォルトは AMQ 側から 6 回) 再試行され、メッセージが失われます。
理由は、プロデューサーが配信モードを永続として設定していないため、一定回数再試行した後、DLQ が作成されず、メッセージが DLQ に移動しないためです。このため、メッセージを失いました。
私のコードは次のようなものです:-
@Configuration
@PropertySource("classpath:application.properties")
public class ActiveMqJmsConfig {
@Autowired
private AbcMessageListener abcMessageListener;
public DefaultMessageListenerContainer purchaseMsgListenerforAMQ(
@Qualifier("AMQConnectionFactory") ConnectionFactory amqConFactory) {
LOG.info("Message listener for purchases from AMQ : Starting");
DefaultMessageListenerContainer defaultMessageListenerContainer =
new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(amqConFactory);
defaultMessageListenerContainer.setMaxConcurrentConsumers(4);
defaultMessageListenerContainer
.setDestinationName(purchaseReceivingQueueName);
defaultMessageListenerContainer
.setMessageListener(abcMessageListener);
defaultMessageListenerContainer.setSessionTransacted(true);
return defaultMessageListenerContainer;
}
@Bean
@Qualifier(value = "AMQConnectionFactory")
public ConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory amqConnectionFactory =
new ActiveMQConnectionFactory();
amqConnectionFactory
.setBrokerURL(System.getProperty(tcp://localhost:61616));
amqConnectionFactory
.setUserName(System.getProperty(admin));
amqConnectionFactory
.setPassword(System.getProperty(admin));
return amqConnectionFactory;
}
}
@Component
public class AbcMessageListener implements MessageListener {
@Override
public void onMessage(Message msg) {
//CODE implementation
}
}
問題 :- client-id を接続レベル (Connection.setclientid("String")) で設定することにより、メッセージが永続的でない場合でも永続的サブスクライバーとしてサブスクライブできます。これにより、アプリケーションが実行時例外をスローした場合、一定回数の再試行の後、Queue に対して DLQ が作成され、メッセージが DLQ に移動されます。
ただし、DefaultMessageListenerContainer では、接続はクライアントに公開されません。クラス自体がプールとして維持していると思います。
DefaultMessageListenerContainer で永続的なサブスクリプションを実現するにはどうすればよいですか?