1

JMS トピックを使用して ActiveMQ 5.9.0 をメッセージ ブローカーとして設定しようとしていますが、メッセージの消費に問題があります。

テスト目的で、1 つのトピック、1 つのイベント プロデューサー、および 1 つのコンシューマーの単純な構成があります。10 個のメッセージを次々に送信しますが、アプリケーションを実行するたびに、これらのメッセージのうち 1 ~ 3 個は消費されません! 他のメッセージは消費され、正常に処理されます。ActiveMQ 管理コンソールのトピックにパブリッシュされたすべてのメッセージを確認できますが、アプリケーションを再起動してもコンシューマーには到達しません (「エンキュー」列と「デキュー」列の数字が異なっています)。

編集:トピックの代わりにキューを使用する場合、この問題は発生しないことにも言及する必要があります。

なぜこうなった?atomikos (トランザクションマネージャー) と何か関係があるのでしょうか? それとも、構成に何か他のものがありますか?どんなアイデア/提案も大歓迎です。:)

これは ActiveMQ/JMS スプリング構成です。

    <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName" value="amq" />
    <property name="xaConnectionFactory">
        <bean class="org.apache.activemq.spring.ActiveMQXAConnectionFactory"
            p:brokerURL="${activemq_url}" />
    </property>
    <property name="maxPoolSize" value="10" />
    <property name="localTransactionMode" value="false" />
</bean>

<bean id="cachedConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="connectionFactory" />
</bean>

<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    <property name="sessionTransacted" value="true" />
    <property name="pubSubDomain" value="true"/>
</bean>

<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="test.topic" />
</bean>

<!-- The Spring message listener container configuration -->
<jms:listener-container destination-type="topic"
    connection-factory="connectionFactory" transaction-manager="transactionManager"
    acknowledge="transacted" concurrency="1">
    <jms:listener destination="test.topic" ref="testReceiver"
        method="receive" />
</jms:listener-container>

プロデューサー:

@Component("producer")
public class EventProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Transactional
    public void produceEvent(String message) {
        this.jmsTemplate.convertAndSend("test.topic", message);
    }
}

消費者:

@Component("testReceiver")
public class EventListener {

    @Transactional
    public void receive(String message) {
        System.out.println(message);
    }  
}

テスト:

    @Autowired
    private EventProducer eventProducer;

    public void testMessages() {

    for (int i = 1; i <= 10; i++) {
        this.eventProducer.produceEvent("message" + i);
    }
4

1 に答える 1

4

これが JMS トピックの性質です。現在のサブスクライバーのみがデフォルトでメッセージを受信します。競合状態があり、コンテナーが開始された後、コンシューマーがサブスクリプションを確立する前にメッセージを送信しています。これは、同じアプリケーションで送受信するトピックを含むユニット/統合テストでよくある間違いです。

Spring の新しいバージョンでは、サブスクライバーが確立されるまで待機するためにポーリングできるメソッドがあります(3.1 以降だと思います)。または、送信を開始する前に少し待つか、サブスクリプションを永続的にすることができます。

于 2014-01-12T14:28:49.927 に答える