11

Situation

At present, we use some custom code on top of ActiveMQ libraries for JMS messaging. I have been looking at switching to Camel, for ease of use, ease of maintenance, and reliability.

Problem

With my present configuration, Camel's ActiveMQ implementation is substantially slower than our old implementation, both in terms of delay per message sent and received, and time taken to send and receive a large flood of messages. I've tried tweaking some configuration (e.g. maximum connections), to no avail.

Test Approach

I have two applications, one using our old implementation, one using a Camel implementation. Each application sends JMS messages to a topic on local ActiveMQ server, and also listens for messages on that topic. This is used to test two Scenarios: - Sending 100,000 messages to the topic in a loop, and seen how long it takes from start of sending to end of handling all of them. - Sending a message every 100 ms and measuring the delay (in ns) from sending to handling each message.

Question

Can I improve upon the implementation below, in terms of time sent to time processed for both floods of messages, and individual messages? Ideally, improvements would involve tweaking some config that I have missed, or suggesting a better way to do it, and not be too hacky. Explanations of improvements would be appreciated.

Edit: Now that I am sending messages asyncronously, I appear to have a concurrency issue. receivedCount does not reach 100,000. Looking at the ActiveMQ web interface, 100,000 messages are enqueued, and 100,000 dequeued, so it's probably a problem on the message processing side. I've altered receivedCount to be an AtomicInteger and added some logging to aid debugging. Could this be a problem with Camel itself (or the ActiveMQ components), or is there something wrong with the message processing code? As far as I can tell, only ~99,876 messages are making it through to floodProcessor.process.

Test Implementation

Edit: Updated with async sending and logging for concurrency issue.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.log4j.Logger;

public class CamelJmsTest{
    private static final Logger logger = Logger.getLogger(CamelJmsTest.class);

    private static final boolean flood = true;
    private static final int NUM_MESSAGES = 100000;

    private final CamelContext context;
    private final ProducerTemplate producerTemplate;

    private long timeSent = 0;

    private final AtomicInteger sendCount = new AtomicInteger(0);
    private final AtomicInteger receivedCount = new AtomicInteger(0);

    public CamelJmsTest() throws Exception {
        context = new DefaultCamelContext();

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory);

        JmsConfiguration jmsConfiguration = new JmsConfiguration(pooledConnectionFactory);
        logger.info(jmsConfiguration.isTransacted());

        ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent();
        activeMQComponent.setConfiguration(jmsConfiguration);

        context.addComponent("activemq", activeMQComponent);

        RouteBuilder builder = new RouteBuilder() {
            @Override
            public void configure() {
                Processor floodProcessor = new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        int newCount = receivedCount.incrementAndGet();

                        //TODO: Why doesn't newCount hit 100,000? Remove this logging once fixed
                        logger.info(newCount + ":" + exchange.getIn().getBody());

                        if(newCount == NUM_MESSAGES){
                            logger.info("all messages received at " + System.currentTimeMillis());
                        }
                    }
                };

                Processor spamProcessor = new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        long delay = System.nanoTime() - timeSent;

                        logger.info("Message received: " + exchange.getIn().getBody(List.class) + " delay: " + delay);
                    }
                };

                from("activemq:topic:test?exchangePattern=InOnly")//.threads(8) // Having 8 threads processing appears to make things marginally worse
                    .choice()
                        .when(body().isInstanceOf(List.class)).process(flood ? floodProcessor : spamProcessor)
                    .otherwise().process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            logger.info("Unknown message type received: " + exchange.getIn().getBody());
                        }
                    });
            }
        };

        context.addRoutes(builder);

        producerTemplate = context.createProducerTemplate();
        // For some reason, producerTemplate.asyncSendBody requires an Endpoint to be passed in, so the below is redundant:
//      producerTemplate.setDefaultEndpointUri("activemq:topic:test?exchangePattern=InOnly");
    }

    public void send(){
        int newCount = sendCount.incrementAndGet();
        producerTemplate.asyncSendBody("activemq:topic:test?exchangePattern=InOnly", Arrays.asList(newCount));
    }

    public void spam(){
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                timeSent = System.nanoTime();
                send();
            }
        }, 1000, 100, TimeUnit.MILLISECONDS);
    }

    public void flood(){
        logger.info("starting flood at " + System.currentTimeMillis());
        for (int i = 0; i < NUM_MESSAGES; i++) {
            send();
        }
        logger.info("flooded at " + System.currentTimeMillis());
    }

    public static void main(String... args) throws Exception {
        CamelJmsTest camelJmsTest = new CamelJmsTest();
        camelJmsTest.context.start();

        if(flood){
            camelJmsTest.flood();
        }else{
            camelJmsTest.spam();
        }
    }
}
4

2 に答える 2

5

現在の状況からJmsConfiguration、単一のスレッドでメッセージのみを消費しているようです。これは意図したものですか?

concurrentConsumersそうでない場合は、プロパティをより高い値に設定する必要があります。これにより、宛先にサービスを提供する JMS リスナーのスレッドプールが作成されます。

例:

JmsConfiguration config = new JmsConfiguration(pooledConnectionFactory);
config.setConcurrentConsumers(10);

これにより、キューからのメッセージを同時に処理する 10 個の JMS リスナー スレッドが作成されます。

編集:

トピックについては、次のようなことができます。

JmsConfiguration config = new JmsConfiguration(pooledConnectionFactory);
config.setConcurrentConsumers(1);
config.setMaxConcurrentConsumers(1);

そして、あなたのルートで:

from("activemq:topic:test?exchangePattern=InOnly").threads(10)

また、ActiveMQ では、仮想宛先を使用できます。仮想トピックはキューのように機能し、通常のキューに使用するのと同じ concurrentConsumers メソッドを使用できます。

さらに編集(送信用):

現在、ブロック送信を行っています。あなたがする必要がありますproducerTemplate.asyncSendBody()


編集

あなたのコードでプロジェクトをビルドして実行しました。floodProcessor私はあなたのメソッドにブレークポイントを設定し、newCount100,000 に達しています。ロギングと、非同期で送受信しているという事実にうんざりしている可能性があると思います。私のマシンでは newCount が 100,000 に達し、"all messages recieved"メッセージは実行後 1 秒以内にログに記録されましたが、プログラムはバッファリングされてからさらに 45 秒間ログを記録し続けました。newCountロギングを減らすことで、数値が体の数値にどれだけ近いかについて、ロギングの効果を確認できます。ロギングを に変更しinfo、キャメル ロギングをオフにすると、ロギングの最後に 2 つの数値が一致しました。

INFO  CamelJmsTest - 99996:[99996]
INFO  CamelJmsTest - 99997:[99997]
INFO  CamelJmsTest - 99998:[99998]
INFO  CamelJmsTest - 99999:[99999]
INFO  CamelJmsTest - 100000:[100000]
INFO  CamelJmsTest - all messages received at 1358778578422
于 2013-01-18T15:21:03.943 に答える