毎秒500以上の大量のストリームを処理しています。データはSpringAMQP+ Rabbitで、10個の同時コンシューマーを持つSimpleMessageListenerContainerを使用して消費されます。15分ごとにDbをチェックし、処理のために特定のプロパティをリロードする必要があります。これは、15分ごとに起動し、SimplelistenerContainerを停止し、必要な作業を実行して、コンテナーを再度開始するクォーツトリガーを使用して実行されます。
アプリの起動時にすべてが完全に機能し、トリガーが起動してコンテナーが再起動すると、同じメッセージが複数回配信されるのがわかります。これにより、多くの重複が発生します。消費者によって投げられる例外はありません。
メッセージリスナー
class RoundRobinQueueListener implements MessageListener {
@Override
public void onMessage(Message message) { //do processing
}
}
アプリの起動中に並列コンシューマーを設定し、コンシューマーを起動します
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
messageListenerContainer.setQueueNames(queueName);
messageListenerContainer.setMessageListener(roundRobinListener);
messageListenerContainer.setConcurrentConsumers(10);
messageListenerContainer.setChannelTransacted(true);
クォーツトリガー
void execute(JobExecutionContext context) throws JobExecutionException {
messageListenerContainer.stop()
//Do db task, other processing
messageListenerContainer.start()
}