24

「TemporaryQueue」を使用してJMS要求/応答メカニズムを使用する場合、そのコードはスケーラブルになりますか?

現在のところ、1秒あたり100リクエストをサポートするのか、1秒あたり数千リクエストをサポートするのかはわかりません。

以下のコードは、私が実装しようと考えているものです。これは、「同期」方式でJMSを利用します。重要な部分は、このセッション用に作成された「一時キュー」を指すために「コンシューマー」が作成される場所です。このような一時キューの使用がスケーラブルな設計であるかどうかはわかりません。

  destination = session.createQueue("queue:///Q1");
  producer = session.createProducer(destination);
  tempDestination = session.createTemporaryQueue();
  consumer = session.createConsumer(tempDestination);

  long uniqueNumber = System.currentTimeMillis() % 1000;
  TextMessage message = session
      .createTextMessage("SimpleRequestor: Your lucky number today is " + uniqueNumber);

  // Set the JMSReplyTo
  message.setJMSReplyTo(tempDestination);

  // Start the connection
  connection.start();

  // And, send the request
  producer.send(message);
  System.out.println("Sent message:\n" + message);

  // Now, receive the reply
  Message receivedMessage = consumer.receive(15000); // in ms or 15 seconds
  System.out.println("\nReceived message:\n" + receivedMessage);

アップデート:

別のパターンに出くわしました。このブログを参照してください 。アイデアは、送信と受信の両方に「通常の」キューを使用することです。ただし、「同期」呼び出しの場合、目的の応答を取得する(つまり、要求に一致させる)ために、「セレクター」を使用して受信キューをリッスンするコンシューマーを作成します。

手順:

    // 1. Create Send and Receive Queue.
    // 2. Create a msg with a specific ID
 final String correlationId = UUID.randomUUID().toString();
 final TextMessage textMessage = session.createTextMessage( msg );
 textMessage.setJMSCorrelationID( correlationId );

    // 3. Start a consumer that receives using a 'Selector'.
           consumer = session.createConsumer( replyQueue, "JMSCorrelationID = '" + correlationId + "'" );

したがって、このパターンの違いは、新しいリクエストごとに新しい一時キューを作成しないことです。代わりに、すべての応答は1つのキューにのみ送信されますが、「セレクター」を使用して、各要求スレッドが関心のある応答のみを受信するようにします。

ここでの欠点は、「セレクター」を使用する必要があることだと思います。それが前述のパターンよりも好まれないのか、それとも好まれるのかはまだわかりません。考え?

4

7 に答える 7

6

投稿の更新に関して-相関IDを使用する場合と同様に、メッセージヘッダーに対してセレクターを実行すると、セレクターは非常に効率的です。Spring Integrationは、JMSアウトバウンドゲートウェイを実装するためにこれを内部的にも実行します。

于 2012-05-29T01:08:41.183 に答える
4

興味深いことに、これのスケーラビリティは、実際には他の応答が説明したものと反対である可能性があります。

WebSphere MQは、可能な場合、動的キュー・オブジェクトを保存して再利用します。したがって、動的キューの使用は無料ではありませんが、キューが解放されると、新しいキューインスタンスを要求する次のスレッドにハンドルを渡すだけでよいため、拡張性は高くなります。ビジーなQMgrでは、ハンドルがスレッドからスレッドに渡される間、動的キューの数は比較的静的なままになります。厳密に言えば、単一のキューを再利用するほど速くはありませんが、悪くはありません。

一方、インデックス作成CORRELIDは高速ですが、パフォーマンスはインデックス内のメッセージ数に反比例します。また、キューの深さが増し始めた場合にも違いが生じます。アプリが空のキューで続行する場合、GET遅延WAITはありません。ただし、深いキューでは、QMgrは既存のメッセージのインデックスを検索して、応答メッセージがそれらの中にないことを確認する必要があります。あなたの例では、それは空のインデックスを検索することと大きなインデックスを1秒間に1,000回検索することの違いです。

CORRELIDその結果、アプリの特性と負荷によっては、それぞれ1つのメッセージを含む1000の動的キューが、1000のスレッドを通過する単一のキューよりも実際に高速になる可能性があります。特定の設計に取り組む前に、これを大規模にテストすることをお勧めします。

于 2013-02-09T07:24:58.950 に答える
2

共有キューの相関IDでセレクターを使用すると、複数のコンシューマーで非常に適切に拡張できます。

ただし、1000リクエスト/秒は多くなります。パフォーマンスに問題があることが判明した場合は、負荷を異なるインスタンス間で少し分割することをお勧めします。

リクエストとクライアントの数について詳しく説明することをお勧めします。クライアントの数が10未満で、かなり静的なままであり、要求数が非常に多い場合、最も回復力があり高速な解決策は、クライアントごとに静的な応答キューを用意することです。

于 2012-05-29T10:40:17.877 に答える
1

一時キューの作成は無料ではありません。結局のところ、それはブローカーにリソースを割り当てています。そうは言っても、(事前に)未知の数の潜在的にバインドされていないクライアント(複数のJVM、JVMごとの複数の同時スレッドなど)がある場合は、選択の余地がない可能性があります。クライアントキューを割り当ててクライアントに割り当てると、すぐに手に負えなくなります。

確かに、あなたがスケッチしたものは、可能な限り最も単純な解決策です。そして、トランザクション量の実数を取得でき、それが十分にスケーリングできる場合は、問題ありません。

一時的なキューを回避することを検討する前に、クライアントの数を制限し、クライアントを長寿命にすることを検討しました。つまり、クライアント側でクライアントプールを作成し、プール内のクライアントに起動時に一時キュー、セッション、接続などを作成させ、後続の要求でそれらを再利用し、シャットダウン時にそれらを破棄します。次に、チューニングの問題は、プールの最大/最小サイズ、プールを整理するためのアイドル時間、およびプールが最大になったときの動作(失敗とブロック)のいずれかになります。任意の数の一時的なJVMを作成している場合を除いて(この場合、JVMの起動オーバーヘッドだけでスケーリングの問題が大きくなります)、それは何でもスケーリングする必要があります。結局のところ、その時点で、割り当てるリソースはシステムの実際の使用状況を反映しています。

避けるべきことは、多数の不必要なキュー、セッション、接続などを作成して破棄することです。サーバー側を設計して、最初からストリーミングできるようにします。次に、必要に応じてプールします。そうではないように、重要なことは何でもする必要があります。

于 2012-05-28T15:50:23.623 に答える
0

一時キューを使用すると、毎回relyToProducersを作成するコストがかかります。静的replyToQueueにキャッシュされたプロデューサーを使用する代わりに、メソッドcreateProducerはコストが高く、同時呼び出しの多い環境でのパフォーマンスに影響を与えます。

于 2014-07-02T04:23:15.137 に答える
0

私は同じ問題に直面していて、ステートレスBean内に接続を自分でプールすることにしました。1つのクライアント接続には1つのtempQueueがあり、1つのBeanインスタンスにバインドされているJMSMessageExchangerオブジェクト(connectionFactory、Queue、およびtempQueueを含む)内にあります。JSE/EE環境でテストしました。しかし、GlassfishのJMSプールの動作についてはよくわかりません。Beanメソッドが終了した後に「手動で」取得したJMS接続を実際に閉じますか?私はひどく間違ったことをしていますか?

また、クライアントBean(TransactionAttributeType.NOT_SUPPORTED)でトランザクションをオフにして、要求メッセージを要求キューにすぐに送信しました。

package net.sf.selibs.utils.amq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import lombok.Getter;
import lombok.Setter;
import net.sf.selibs.utils.misc.UHelper;

public class JMSMessageExchanger {

    @Setter
    @Getter
    protected long timeout = 60 * 1000;

    public JMSMessageExchanger(ConnectionFactory cf) {
        this.cf = cf;
    }

    public JMSMessageExchanger(ConnectionFactory cf, Queue queue) {
        this.cf = cf;
        this.queue = queue;
    }
    //work
    protected ConnectionFactory cf;
    protected Queue queue;
    protected TemporaryQueue tempQueue;
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected MessageConsumer consumer;
    //status
    protected boolean started = false;
    protected int mid = 0;

    public Message makeRequest(RequestProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
                this.tempQueue = this.session.createTemporaryQueue();
                this.consumer = this.session.createConsumer(tempQueue);
            }
            //send request
            Message requestM = producer.produce(this.session);
            mid++;
            requestM.setJMSCorrelationID(String.valueOf(mid));
            requestM.setJMSReplyTo(this.tempQueue);
            this.producer.send(this.queue, requestM);
            //get response
            while (true) {
                Message responseM = this.consumer.receive(this.timeout);
                if (responseM == null) {
                    return null;
                }
                int midResp = Integer.parseInt(responseM.getJMSCorrelationID());
                if (mid == midResp) {
                    return responseM;
                } else {
                    //just get other message
                }
            }

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    public void makeResponse(ResponseProducer producer) throws Exception {
        try {
            if (!this.started) {
                this.init();
            }
            Message response = producer.produce(this.session);
            response.setJMSCorrelationID(producer.getRequest().getJMSCorrelationID());
            this.producer.send(producer.getRequest().getJMSReplyTo(), response);

        } catch (Exception ex) {
            this.close();
            throw ex;
        }
    }

    protected void init() throws Exception {
        this.connection = cf.createConnection();
        this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        this.producer = this.session.createProducer(null);
        this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        this.connection.start();
        this.started = true;
    }

    public void close() {
        UHelper.close(producer);
        UHelper.close(consumer);
        UHelper.close(session);
        UHelper.close(connection);
        this.started = false;
    }

}

クライアント(ステートレスBean)とサーバー(@MessageDriven)で同じクラスが使用されます。RequestProducerとResponseProducerはインターフェースです。

package net.sf.selibs.utils.amq;

import javax.jms.Message;
import javax.jms.Session;

public interface RequestProducer {
    Message produce(Session session) throws Exception;
}
package net.sf.selibs.utils.amq;

import javax.jms.Message;

public interface  ResponseProducer extends RequestProducer{
    void setRequest(Message request);
    Message getRequest();
}

また、AMQを介した要求/応答の実装に関するAMQの記事を読みました:http: //activemq.apache.org/how-should-i-implement-request-response-with-jms.html

于 2015-10-02T06:43:36.713 に答える
-1

手遅れかもしれませんが、今週は同期要求/応答をJMS内で機能させるために数時間を費やしました。タイムアウトを使用してQueueRequesterを拡張するのはどうですか。私が行ったところ、少なくとも1台のマシン(実行中のブローカー、リクエスター、リピーター)でテストしたところ、このソリューションは説明したソリューションよりも優れていることがわかりました。一方、それはQueueConnectionの使用に依存し、それはあなたが複数の接続を開くことを余儀なくされるかもしれないことを意味します。

于 2013-02-08T16:11:43.197 に答える