1

必要なもの: 1. jms を介してメッセージを送信します。2. クライアントから処理されず、サーバーが再起動した場合にクライアントがサーバーの再起動時にメッセージを取得できるように、メッセージを永続化します。3. メッセージは、これらのメッセージが送信されたのと同じ順序でクライアントから受信する必要があります。4. キューに複数のコンシューマーを配置して、さまざまなメッセージを並行してスケーリングおよび処理できるようにします。

私がやったこと:私はアクティブなmqを使用し、次の春の構成を持っています

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="destination"/>
</bean>

<bean id="template" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
    <property name="deliveryMode" value="2"/>
    <property name="defaultDestination" ref="destination"/>
</bean>

<bean id="messageListener" class="InternalNotificationJMSConsumer"/>

<jms:listener-container connection-factory="pooledConnectionFactory" acknowledge="client" destination-type="queue" concurrency="5-10" container-type="simple"  cache="none" client-id="client1" prefetch="1">
      <jms:listener destination="destination" ref="messageListener" method="onMessage"/>
</jms:listener-container>

これで宛先と接続プールができました。次に、春の jmstemplate を作成し、メッセージの送信を開始します

template.send(new MessageCreator() {
  public Message createMessage(Session session) throws JMSException {
    return session.createObjectMessage(message);
  }
});

それから私は次のように受信機を持っています:

@Component
public class JMSConsumer implements MessageListener {

  @Override
  public void onMessage(Message message) {
    if (message instanceof ObjectMessage) {
      ....
     }
   }

私の問題:

を。この構成を使用するメッセージは、順番どおりに受信されません。これは、listener-container.concurency の xsd コメントを考えると理にかなっています。したがって、問題は、注文を維持できるが、複数の消費者がいる方法/場合です。

b. メッセージ確認。私が必要とするのは、メッセージが処理された後、onMessage が返され、メッセージが確認済みと見なされ、再送信されないことです。私はアプリケーションサーバーの内部にいないという点でトランザクションマネージャーがなく、可能であればこれを避けたいと思っています。私は自分自身のメッセージを確認するだけで済みます。これは可能ですか?

4

2 に答える 2

1

メッセージを並行して消費する場合、スレッド化の性質上、メッセージが送信された順序どおりに完了するという保証はありません。順序どおりにコンシューマーに渡されるということだけです。確実に順序付けする必要がある特定のメッセージ (通常はトランザクション/アカウントに関連する) のみがある場合、メッセージ グループ機能を使用して、グループをコンシューマーに関連付けることができます。これは一種のスティッキー ロード バランシングです。

自分でメッセージを確認することは間違いなく可能ですが、おそらく必要ありません。MessageListenerContainerには属性acknowledgeがあり、これを設定すると、例外がスローされることなく、実行が完了するとtransactedメッセージが消費されたとのみ認識されます。MessageListenerトランザクション マネージャーは必要ありません (ただし、Spring のPlatformTransactionManagerコンテナーを使用したい場合は、特定のコンテナーは必要ありません)。

確実にメッセージを自分で確認したい場合は、 に設定acknowledgeし、メッセージが正常に完了したときにclient呼び出します。message.acknowledge()これが呼び出されずにリスナーが完了すると、メッセージは DLQ (デフォルト) に送信されるか、再配信されます。

于 2013-09-11T11:02:33.397 に答える
1

これが私の問題に対する答えです。

<jms:listener-container connection-factory="pooledConnectionFactory" acknowledge="client" destination-type="queue" concurrency="5-10" container-type="simple"  cache="none" prefetch="1">
      <jms:listener destination="destination" ref="messageListener" method="onMessage"/>
</jms:listener-container>

次に、メッセージ リスナーの実装について: BlockingExecutor executor = new BlockingExecutor(5); //消費される同時メッセージの数

public void onMessage(Message message) {
....
executor.submitTask(new MessageDispatchCommand((ObjectMessage) message));
....

}

class MessageDispatchCommand implements Runnable {
  private ObjectMessage message;

  public MessageDispatchCommand (final ObjectMessage message) {
    this.message = message;
  }

  public void run() {
    try {
      Serializable msg = message.getObject();
      handle message here
     }....
}

BlockingExecutor コードは、ここから見つけることができます: Java Concurrency in Practice: BoundedExecutor 実装

簡単に言えば。これによりメッセージの順序が正しく保たれるため、jms メッセージリスナーは 1 つしかありません。次に、メッセージを取得して処理するための実行可能なコマンドを作成します (これは時間のかかる作業ですが、別のスレッド内で行われます)。すべてのメッセージ処理スレッドが使い果たされたときにboundedexecutorを使用するため、onMessageブロックと後続のメッセージは、bounded executorのスレッドが再び解放されるまで単純に永続化されます。

于 2013-09-17T14:52:44.680 に答える