1

ActiveMQ を使用して配信不能キューを実装しようとしています。残念ながら、この点に関するドキュメントはいくつかの面でかなり曖昧であり、すべてを適切にセットアップすることはできないようです.

次の Bean を構成しています。

@Bean
public JmsTemplate createJMSTemplate() {
    logger.info("createJMSTemplate");
    JmsTemplate jmsTemplate = new JmsTemplate(getActiveMQConnectionFactory());
    jmsTemplate.setDefaultDestinationName(queue);
    jmsTemplate.setDeliveryPersistent(true);
    jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    return jmsTemplate;
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(getActiveMQConnectionFactory());
    factory.setConcurrency("1-10");
    factory.setSessionTransacted(false);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

@Bean
public ConnectionFactory getActiveMQConnectionFactory() {
    // Configure the ActiveMQConnectionFactory
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
    activeMQConnectionFactory.setTrustedPackages(Arrays.asList("com.company"));

    // Configure the redeliver policy and the dead letter queue
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(0);
    redeliveryPolicy.setRedeliveryDelay(10000);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(3);
    RedeliveryPolicyMap redeliveryPolicyMap = activeMQConnectionFactory.getRedeliveryPolicyMap();
    redeliveryPolicyMap.put(new ActiveMQQueue(queue), redeliveryPolicy);
    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

    return activeMQConnectionFactory;
}

そして、これは私の受信コードです:

@Autowired
private ConnectionFactory connectionFactory;

private static Logger logger = LoggerFactory.getLogger(QueueReceiver.class);
private Connection connection;
private Session session;
private SegmentReceiver callback;

@PostConstruct
private void init() throws JMSException, InterruptedException {
    logger.info("Initializing QueueReceiver...");
    this.connection = connectionFactory.createConnection();
    this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Queue q = session.createQueue(queue);
    logger.info("Creating consumer for queue '{}'", q.getQueueName());
    MessageConsumer consumer = session.createConsumer(q);
    this.callback = new SegmentReceiver();
    consumer.setMessageListener(callback);
    this.connection.start();
}

@PreDestroy
private void destroy() throws JMSException {
    logger.info("Destroying QueueReceiver...");
    this.session.close();
    this.connection.close();
}

private class SegmentReceiver implements MessageListener {

    @Override
    public void onMessage(Message message) {
        logger.info("onMessage");
        try {
            TextMessage textMessage = (TextMessage) message;
            Segment segment = Segment.fromJSON(textMessage.getText());
            if (segment.shouldFail()) {
                throw new IOException("This segment is expected to fail");
            }
            System.out.println(segment.getText());
            message.acknowledge();
        }
        catch(IOException | JMSException exception) {
            logger.error(exception.toString());
            try {
                QueueReceiver.this.session.rollback();
            } catch (JMSException e) {
                logger.error(e.toString());
            }
            throw new RuntimeException(exception);
        }
    }

}

しかし、何も起こりません。デフォルト構成を使用して、すぐに使用できる Apache ActiveMQ 5.14.2 セットアップを使用しています。ここで何が欠けていますか?

4

2 に答える 2

2

あなたが使用しているため、 this.session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
呼び出しは呼び出しmessage.acknowledge();と同じですsession.acknowledge();

構成で ActiveMQ 再配信が正常に機能するようにするには、最小限の変更でいくつかの可能性があります。

  1. 呼ぶQueueReceiver.this.session.recover();
    代わりに呼ぶ QueueReceiver.this.session.rollback();

void org.apache.activemq.ActiveMQSession.recover() が JMSException をスローする

このセッションでのメッセージ配信を停止し、最も古い未確認メッセージからメッセージ配信を再開します。

すべてのコンシューマーは、メッセージを順番に配信します。受信したメッセージを確認すると、クライアントに配信されたすべてのメッセージが自動的に確認されます。

セッションを再開すると、次のアクションが実行されます。 • メッセージ配信の停止 • 配信された可能性があるが確認応答されていないすべてのメッセージを「再配信」としてマーク • 以前に配信されたすべての未確認メッセージを含む配信シーケンスを再開します。再配信されたメッセージは、元の配信順序どおりに配信される必要はありません。

  1. this.session = connection.createSession(false, org.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); このメソッドを呼び出さ ないこと ((org.apache.activemq.command.ActiveMQMessage) message ).acknowledge();はロールバックのようなものであることに注意してください。つまり、メッセージは確認されず、メソッドで例外をスローするとorg.apache.activemq.ActiveMQMessageConsumer.rollback()onMessageが呼び出されます。QueueReceiver.this.consumer.rollback();

  2. 呼び出しの代わりに単にQueueReceiver.this.consumer.rollback(); org.apache.activemq.ActiveMQMessageConsumer.rollback()を呼び出すQueueReceiver.this.session.rollback();

于 2016-12-16T13:42:52.237 に答える