0

ここでのいくつかの構成: 非永続コンシューマー、非永続メッセージ、フロー制御の無効化、デフォルトのプリフェッチ サイズ、optimizeAcknowledge = true、asynsend = true、jms を使用して ActiveMQ に接続

例えば、

1 つの生産者、1 つの消費者、</p>

Producer————Topic————consumer

プロデューサーの送信レートは 6k/s に達する可能性があります

ただし、この場合: 1 つのプロデューサー 3 つのコンシューマー、

                /——consumer

Producer——-Topic——-consumer

                \——consumer

プロデューサーの送信レートが 4k/s に低下

これが私のキーコードの一部です:

送信者クラス:

public class sender {

    public Boolean durable=false;
    public String clientID=null;
    public Boolean transacted=false;
    public int ackMode=Session.AUTO_ACKNOWLEDGE;
    public int timeToLive=0;
    public String queuename = "";
    public int persistent = DeliveryMode.NON_PERSISTENT;

    public Connection createConnection(String user,String pwd,String url) throws JMSException, Exception {   
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
         connectionFactory.setDispatchAsync(true);
         //connectionFactory.setAlwaysSessionAsync(false);
         Connection connection = connectionFactory.createConnection();   
         if (durable && clientID!=null) {   
             connection.setClientID(clientID);   
         }   
         connection.start();   
         return connection;   
        }  

    public Session createSession(Connection connection) throws Exception {   
        Session session = connection.createSession(transacted, ackMode);   
        return session;   
       }   

    public MessageProducer createProducer(Session session) throws JMSException {   
        Queue destination = session.createQueue(queuename);   
        MessageProducer producer = session.createProducer(destination);   
        producer.setDeliveryMode(persistent);   

        if( timeToLive!=0 )   
            producer.setTimeToLive(timeToLive);   
        return producer;   
        }   

    public void onMessage(Message message) {   
         //process message   
         } 
}

sendmain メソッド:</p>

public static void main(String[] args) throws JMSException, Exception {
        // TODO Auto-generated method stub
        sender s = new sender();
        s.persistent = DeliveryMode.NON_PERSISTENT;
        Connection c = s.createConnection("","","tcp://localhost:61616?jms.useAsyncSend=true");
        Session sess = s.createSession(c);
        Topic topic = sess.createTopic("topic.test");
        MessageProducer mp = sess.createProducer(topic);
        StringBuffer tmpsb=new StringBuffer();
        for (int j=0;j<1024;j++)
        {
        tmpsb.append("0");
        }
        Message m = sess.createTextMessage(tmpsb.toString());
        long pre=System.currentTimeMillis();
        for (int i=0;i<10000;i++)
        {
            mp.send(m);
        }
        long post=System.currentTimeMillis();
        mp.close();
        System.out.println("sendtime:"+(post-pre));
        System.out.println("sendrate:"+10000000/(float)(post-pre));
        System.out.println("timenow:"+(post));
    }

レシーバー クラス コード:

public class receiver implements MessageListener
{
    public  int receivetimes=0;
    public long pretime;

    public void onMessage(Message msg)
    {
        //TextMessage tm = (TextMessage) msg;
        try {
            if (receivetimes==0)
            {
                pretime=System.currentTimeMillis();
            }
            receivetimes+=1;
            if (receivetimes==10000)
            {
                long now=System.currentTimeMillis();
                System.out.println("time:"+(now-pretime)+"\nrecive rate:"+9999999/(float)(now-pretime));
                System.out.println("timenow:"+(now));
                receivetimes=0;
            }

        } catch(Throwable t) {
            t.printStackTrace();
        }
    }
}

ここの受信者クラスのコードには、createConnection、createSession など、送信者クラスと同様のメソッドがいくつか隠されています。

レシーバーの主な方法:

public static void main(String[] args) throws JMSException, Exception {
        // TODO Auto-generated method stub
        receiver s = new receiver();
        Connection c = s.createConnection("","","tcp://localhost:6151?jms.optimizeAcknowledge=true");
        Session sess = s.createSession(c);
        Topic destination  = sess.createTopic("topic.test");   
        MessageConsumer  consumer = sess.createConsumer(destination);  
        consumer.setMessageListener(new receiver());   
    }

すべてのコンシューマはスタンドアロン プロセスにあります。3 つのコンシューマーと 1 つのプロデューサーを実行したところ、パフォーマンスが低下しました。なぜ私がこれを得るのか誰にも分かりますか?

4

1 に答える 1

0

@TimBishが言ったように。問題は「同じマシン上のプロデューサー、コンシューマー、アクティブMQサーバー」です。分離すると、問題は発生しません。

厳密な方法で何かをテストすることはとても重要です.......

于 2013-03-20T07:08:08.687 に答える