5

Spring 4.x の DefaultJmsListenerContainerFactory を使用して、ActiveMQ キューに接続し、@JmsListener を使用してそのキューからのメッセージを処理し、メッセージを同じ ActiveMQ ブローカーのトピックにプッシュします。

コンシューマ/リスナーとプロデューサの両方に単一のキャッシング接続ファクトリを使用しており、コンシューマではなくプロデューサをキャッシュできるように、キャッシュ コンシューマを false に設定しています。また、同時実行数を 1 ~ 3 に設定しました。これにより、アプリケーションの起動時にキューに少なくとも 1 つのコンシューマーが存在すると予想されます。メッセージが増加すると、コンシューマーの数は 3 に達します。メッセージが減少すると、コンシューマーの数も 1 に戻ると予想していました。しかし、スレッド (defaultmessagelistenercontainer-2/3) を見ると、それらは待機状態にあり、シャットダウンしていません。負荷が低下すると、多くのコンシューマーもシャットダウンすることが予想されるというのは、予想される動作ではないでしょうか? 以下の私の設定を見てください。

ApplicationContext.java

    @Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
    return factory;
}

@Bean
public CachingConnectionFactory connectionFactory(){
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class));
    redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class));
    redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class));
    redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class));
    redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class));

    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint"));
    activeMQ.setRedeliveryPolicy(redeliveryPolicy);
    activeMQ.setPrefetchPolicy(prefetchPolicy());

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
    cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class));
    cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class));
    return cachingConnectionFactory;
}

@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out"));

    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
    jmsMessagingTemplate.setDefaultDestination(activeMQ);

    return jmsMessagingTemplate;
}

アプリケーションのプロパティ

jms.connections.concurrent=1-3
jms.connections.prefetch=1000
jms.connections.transacted=true
jms.connections.cache.consumers=false
jms.redelivery.initial-delay=1000
jms.redelivery.delay=1000
jms.redelivery.maximum=5
jms.redelivery.use-exponential-back-off=true
jms.redelivery.back-off-multiplier=2
jms.cache.size=3
jms.queue.in=in.queue
jms.queue.out=out.queue
jms.broker.endpoint=failover:(tcp://localhost:61616)
4

1 に答える 1