1

RabbitMQ とは対照的に、ActiveMQ を使用した方が (パブリッシュと消費の両方で) より高速な生のメッセージング スループットを達成するのは珍しいことでしょうか? 私が遭遇した他のすべてのオンライン リファレンスは、RabbitMQ が高速であると自慢しているため、質問しています。

私は正規のベンチマーク ツールでテストしていません。代わりに、両方の基本的なパブリッシャー/コンシューマーの例を変更して、3 キロバイトのメッセージ本文で 100,000 件のメッセージをテストしました。2 つの異なる Amazon EC2 x-large インスタンスでパブリッシュとコンシュームの両方をテストしていることに注意してください。コードを正しく設定していない可能性がありますか? 以下の結果とコードをご覧ください。

ActiveMQ Send 3kb   
Average Time per Message (ns):  497276.1179
Average # Messages per second:  2010.935101
Total Time (s):                 49.72810906

ActiveMQ Recv 3kb   
Average Time per Message (ns):  43813.35476
Average # Messages per second:  22823.86285
Total Time (s):                 4.381379289

RabbitMQ Send 3kb   
Average Time per Message (ns):  1041524.626
Average # Messages per second:  960.1309229
Total Time (s):                 104.1524626

RabbitMQ Recv 3kb   
Average Time per Message (ns):  612559.3732
Average # Messages per second:  1632.494814
Total Time (s):                 61.25593732

RabbitMQ Send.java および Recv.java で queueDeclare() を削除した後に更新された数値:

これにより、RabbitMQ の時間が大幅に改善されましたが、ActiveMQ の消費時間はわずか 4 秒で、何かがおかしいに違いありません...

ActiveMQ Send 3kb   
Average Time per Message (ns):  491404.5666
Average # Messages per second:  2034.983124
Total Time (s):                 49.14045666

ActiveMQ Recv 3kb   
Average Time per Message (ns):  41976.17158
Average # Messages per second:  23823.03965
Total Time (s):                 4.197617158

RabbitMQ Send 3kb   
Average Time per Message (ns):  354795.8818
Average # Messages per second:  2818.522005
Total Time (s):                 35.47958818

RabbitMQ Recv 3kb   
Average Time per Message (ns):  440349.3892
Average # Messages per second:  2270.924009
Total Time (s):                 44.03493892

ActiveMQ Send.java

public class Send implements Runnable {

private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;

public static void main(String[] argv) throws java.io.IOException {
    (new Thread(new Send())).start();
}

public void run() {
    try {
        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("TEST.FOO");

        // Create a MessageProducer from the Session to the Topic or Queue
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        for (int i=0; i <= NUMBER_OF_MESSAGES; i++){
            startTime = System.nanoTime();

            // 3kb
            String text = "AMFu8UlKW2zJBxUQbxNfU3HneB11uEOeC..."

            TextMessage message = session.createTextMessage(text);

// Tell the producer to send the message
            //System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);
            stopTime = System.nanoTime();
            totalTime = totalTime + stopTime-startTime;
            System.out.println(i + "," + Long.toString(stopTime-startTime));

        }

        // Clean up
        session.close();
        connection.close();

        //System.out.println("");
        //System.out.println("Total Time: " + totalTime + "ns");
        //System.out.println("Avg. Time: " + totalTime/NUMBER_OF_MESSAGES + "ns");
        //System.out.println("");

    }
    catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}
}

ActiveMQ Recv.java

public class Recv implements Runnable {

private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;

public static void main(String[] argv)
    throws java.io.IOException {

    (new Thread(new Recv())).start();

}

public void run() {
    try {

        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://x.x.x.x:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("TEST.FOO");

        // Create a MessageConsumer from the Session to the Topic or Queue
        MessageConsumer consumer = session.createConsumer(destination);

        // Message Listener
        MyListener listener = new MyListener();
        consumer.setMessageListener(listener);

        // Wait for a message
        //Message message = consumer.receive(1000);

       // consumer.close();
       // session.close();

// connection.close();
    } catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}

public class MyListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            startTime = System.nanoTime();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                stopTime = System.nanoTime();
                totalTime = totalTime + stopTime-startTime;

                System.out.println(numMessages + "," + Long.toString(stopTime-startTime));

                numMessages++;

            } else {
                System.out.println("Received: " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
}

RabbitMQ Send.java

public class Send implements Runnable {

private final static String QUEUE_NAME = "hello";
private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;

// 3kb
private static final String message = "AMFu8UlKW2zJB..."

public static void main(String[] argv)
 throws java.io.IOException {

 (new Thread(new Send())).start();

}

public void run() {

try {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("localhost");
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();

     for (int i=1; i <= NUMBER_OF_MESSAGES; i++){
         startTime = System.nanoTime();

         // No Persistence
         // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

         stopTime = System.nanoTime();
         totalTime = totalTime + stopTime-startTime;
         System.out.println(i + "," + Long.toString(stopTime-startTime));
     }

     channel.close();
     connection.close();

 } catch (Exception e) {
     e.printStackTrace();
 }
}
}

RabbitMQ Recv.java

private final static String QUEUE_NAME = "hello";
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;

public static void main(String[] argv) {
    (new Thread(new Recv())).start();
}

public void run(){
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // No Persistence
        // channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);

        while (true) {
            startTime = System.nanoTime();
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            stopTime = System.nanoTime();
            totalTime = totalTime + stopTime-startTime;

            System.out.println(numMessages + "," + Long.toString(stopTime-startTime));

            numMessages++;

        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}
4

1 に答える 1

2

さて、私はあなたのコードとあなたのベンチマークのマークを見ましたが、ちょうどRecvの方法で。RabbitMq の数値が ActiveMq の 2 倍になっていることがわかりました。それから両方のソースコードを見て、何かが私に警告しました..

Rabbitqm Recv ソース コードでは、すべてのメッセージに対して常に queuDeclare を実行しています。通信時間が現在の主要なレイテンシである場合、Rabbitmq よりも ActiveMq からの 2 倍の時間がここから来ることを確認してください。

于 2012-10-03T12:54:34.260 に答える