2

強力なスケーラビリティが必要な Java EE プロジェクトを開始しています。これまでのコンセプトは次のとおりでした。

  • アーキテクチャのさまざまな部分を担当するいくつかのメッセージ駆動型 Bean
  • 各 MDB には、ビジネス ロジックを処理するセッション Bean が注入されています。
  • 永続層へのアクセスを提供するエンティティ Bean のカップル
  • JMS メッセージを介した要求/応答の概念による、アーキテクチャのさまざまな部分間の通信:
    • MDB がアクティビティ リクエストを含むメッセージを受信する
    • セッション Bean を使用して必要なビジネス ロジックを実行する
    • msg 内の応答オブジェクトを元のリクエスタに返します

アイデアは、メッセージ バスを介してアーキテクチャの部分を互いに分離することにより、スケーラビリティに制限がないというものでした。より多くのコンポーネントを開始するだけです。それらが同じバスに接続されている限り、どんどん成長していくことができます。

残念ながら、リクエストとリプライの概念に大きな問題があります。トランザクション管理は私たちの邪魔をしているようです。セッション Bean がメッセージを消費することになっていないと思われますか?!

http://blogs.oracle.com/fkieviet/entry/request_reply_from_an_ejbhttp://forums.sun.com/message.jspa?messageID=10338789を読むと、人々が実際にリクエスト/リプライの概念に反対することを推奨しているように感じますEJB。

その場合、EJB 間でどのように通信しますか? (スケーラビリティは私が求めているものであることを忘れないでください)

現在のセットアップの詳細:

  • MDB 1 'TestController'、ビジネス ロジックに (ローカル) SLSB 1 'TestService' を使用
  • TestController.onMessage() は、TestService にキュー XYZ にメッセージを送信させ、応答を要求します。
    • TestService は Bean 管理のトランザクションを使用します
    • TestService は、初期化時にジョイント接続ファクトリを介して JMS ブローカへの接続とセッションを確立します (@PostConstruct)
    • TestService は送信後にトランザクションをコミットし、別のトランザクションを開始して応答を 10 秒間待機します。
  • メッセージは、ビジネス ロジックに (ローカル) SLSB 2 'LocationService' を使用する MDB 2 'LocationController' に到達します。
  • LocationController.onMessage() は、LocationServiceが要求された JMSReplyTo キューに メッセージを送り返すようにします。
    • 同じ BMT コンセプト、同じ @PostConstruct コンセプト
  • すべてが同じ接続ファクトリーを使用してブローカーにアクセスします

問題: 最初のメッセージが (SLSB 1 によって) 送信され、(MDB 2 によって) 受信されます。返信メッセージの送信 (SLSB 2 による) も問題ありません。ただし、SLSB 1 は何も受信せず、タイムアウトするだけです。

messageSelector なしで試してみましたが、変更はありませんが、まだメッセージを受信して​​いません。

セッション Bean でメッセージを消費しても問題ないですか?

SLSB 1 - TestService.java

@Resource(name = "jms/mvs.MVSControllerFactory")
private javax.jms.ConnectionFactory connectionFactory;

@PostConstruct
public void initialize() {
    try {
      jmsConnection = connectionFactory.createConnection();
      session = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      System.out.println("Connection to JMS Provider established");
    } catch (Exception e) { }
}

public Serializable sendMessageWithResponse(Destination reqDest, Destination respDest, Serializable request) {
    Serializable response = null;

    try {
        utx.begin();
        Random rand = new Random();
        String correlationId = rand.nextLong() + "-" + (new Date()).getTime();

        // prepare the sending message object
        ObjectMessage reqMsg = session.createObjectMessage();
        reqMsg.setObject(request);
        reqMsg.setJMSReplyTo(respDest);
        reqMsg.setJMSCorrelationID(correlationId);

        // prepare the publishers and subscribers
        MessageProducer producer = session.createProducer(reqDest);

        // send the message
        producer.send(reqMsg);
        System.out.println("Request Message has been sent!");
        utx.commit();

        // need to start second transaction, otherwise the first msg never gets sent
        utx.begin();
        MessageConsumer consumer = session.createConsumer(respDest, "JMSCorrelationID = '" + correlationId + "'");
        jmsConnection.start();
        ObjectMessage respMsg = (ObjectMessage) consumer.receive(10000L);
        utx.commit();

        if (respMsg != null) {
            response = respMsg.getObject();
            System.out.println("Response Message has been received!");
        } else {
            // timeout waiting for response
            System.out.println("Timeout waiting for response!");
        }

    } catch (Exception e) { }

    return response;
}

SLSB 2 - LocationService.Java (応答方法のみ、残りは上記と同じ)

public boolean reply(Message origMsg, Serializable o) {
    boolean rc = false;

    try {
        // check if we have necessary correlationID and replyTo destination
        if (!origMsg.getJMSCorrelationID().equals("") && (origMsg.getJMSReplyTo() != null)) {
            // prepare the payload
            utx.begin();
            ObjectMessage msg = session.createObjectMessage();
            msg.setObject(o);

            // make it a response
            msg.setJMSCorrelationID(origMsg.getJMSCorrelationID());
            Destination dest = origMsg.getJMSReplyTo();

            // send it
            MessageProducer producer = session.createProducer(dest);
            producer.send(msg);
            producer.close();
            System.out.println("Reply Message has been sent");
            utx.commit();

            rc = true;
        }

    } catch (Exception e) {}

    return rc;
}

sun-resources.xml

<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerRequest"  res-type="javax.jms.Queue"  res-adapter="jmsra">
    <property name="Name" value="mvs.LocationControllerRequestQueue"/>
</admin-object-resource>
<admin-object-resource enabled="true" jndi-name="jms/mvs.LocationControllerResponse"  res-type="javax.jms.Queue"  res-adapter="jmsra">
    <property name="Name" value="mvs.LocationControllerResponseQueue"/>
</admin-object-resource>

<connector-connection-pool name="jms/mvs.MVSControllerFactoryPool"  connection-definition-name="javax.jms.QueueConnectionFactory"  resource-adapter-name="jmsra"/>
<connector-resource enabled="true" jndi-name="jms/mvs.MVSControllerFactory" pool-name="jms/mvs.MVSControllerFactoryPool"  />
4

1 に答える 1

1

JMS を使用している場合でも、要求/応答パターンは依然として本質的に同期的です。発信者はメッセージを送信し、応答を待ちます。これは分散トランザクションのために複雑であるだけでなく、応答を待っている間に 1 つまたは複数のリソース (少なくともこの場合はスレッド) が割り当てられて無駄になることを意味します。このようにスケーリングすることはできません。スレッドの数によって本質的に制限されます。

真にスケーラブルな JMS アーキテクチャを実現するには、すべてを非同期にする必要があります。言い換えれば、決して待つべきではありません。送受信されるメッセージは、次のアクティビティをトリガーするために必要な情報を渡す必要があります。

メッセージのサイズが大きすぎる場合は、識別子のみを格納し、対応するデータをデータベースに格納できます。しかし、データベースは再び争点になります。

さまざまなメッセージが、どの長時間実行プロセスに参加しているかを知る必要がある場合は、相関識別子を使用することもできます。メッセージが受信されると、受信側は相関識別子を使用して長時間実行されるアクティビティを「再開」できます。これは BPEL の伝統的なパターンです。同期要求/応答と相関識別子を使用した非同期メッセージの主な違いは、各ステップ間でリソースを解放できることです。後者ではスケーリングできますが、最初のものではスケーリングできません。

正直なところ、私はあなたの長い投稿に混乱し、あなたの設計がどちらかというと非同期 (かつ正しい) なのか、それともリクエスト/リプライと同期している (そして問題がある) のかがわかりませんでした。しかし、答えの要素を提供できれば幸いです。

いずれにせよ、Web サイトEnterprise Integration Patternsにアクセスしてください。これは貴重な情報源です。

于 2010-03-23T11:54:00.420 に答える