7

次のテストでは、次のシナリオをシミュレートしようとしています。

  1. メッセージキューが開始されます。
  2. メッセージ処理中に失敗するように設計されたコンシューマーが開始されます。
  3. メッセージが生成されます。
  4. コンシューマーはメッセージの処理を開始します。
  5. 処理中に、メッセージ処理の失敗をシミュレートするために例外がスローされます。失敗したコンシューマーは停止します。
  6. 別の消費者は、再配信されたメッセージを受け取ることを目的として開始されます。

しかし、私のテストは失敗し、メッセージは新しいコンシューマーに再配信されません。これに関するヒントをいただければ幸いです。

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
        loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest  extends AbstractJUnit4SpringContextTests {
    @Autowired
    private FailureReprocessTestScenario testScenario;

    @Before
    public void setUp() {
        testScenario.start();
    }

    @After
    public void tearDown() throws Exception {
        testScenario.stop();
    }

    @Test public void 
    should_reprocess_task_after_processing_failure() {
        try {
            Thread.sleep(20*1000);

            assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
                    "task-1",
            })));
        } catch (InterruptedException e) {
            fail();
        }
    }

    @Configurable
    public static class FailureReprocessTestScenario {
        @Autowired
        public BrokerService broker;

        @Autowired
        public MockTaskProducer mockTaskProducer;

        @Autowired
        public FailingWorker failingWorker;

        @Autowired
        public SucceedingWorker succeedingWorker;

        @Autowired
        public TaskScheduler scheduler;

        public void start() {
            Date now = new Date();
            scheduler.schedule(new Runnable() {
                public void run() { failingWorker.start(); }
            }, now);

            Date after1Seconds = new Date(now.getTime() + 1*1000);
            scheduler.schedule(new Runnable() {
                public void run() { mockTaskProducer.produceTask(); }
            }, after1Seconds);

            Date after2Seconds = new Date(now.getTime() + 2*1000);
            scheduler.schedule(new Runnable() {
                public void run() {
                    failingWorker.stop();
                    succeedingWorker.start();
                }
            }, after2Seconds);
        }

        public void stop() throws Exception {
            succeedingWorker.stop();
            broker.stop();
        }
    }

    @Configuration
    @ImportResource(value={"classpath:applicationContext-jms.xml",
            "classpath:applicationContext-task.xml"})
    public static class ContextConfig {
        @Autowired
        private ConnectionFactory jmsFactory;

        @Bean
        public FailureReprocessTestScenario testScenario() {
            return new FailureReprocessTestScenario();
        }

        @Bean
        public MockTaskProducer mockTaskProducer() {
            return new MockTaskProducer();
        }

        @Bean
        public FailingWorker failingWorker() {
            TaskListener listener = new TaskListener();
            FailingWorker worker = new FailingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        @Bean
        public SucceedingWorker succeedingWorker() {
            TaskListener listener = new TaskListener();
            SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
            DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
            listenerContainer.setConnectionFactory(jmsFactory);
            listenerContainer.setDestinationName("tasksQueue");
            listenerContainer.setMessageListener(listener);
            listenerContainer.setAutoStartup(false);
            listenerContainer.initialize();
            return listenerContainer;
        }

    }

    public static class FailingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(FailingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
        }

        public void start() {
            LOG.info("FailingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("FailingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("FailingWorker.processTask(" + task + ")");
            try {
                Thread.sleep(1*1000);
                throw Throwables.propagate(new Exception("Simulate task processing failure"));
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Unexpected interruption exception");
            }
        }
    }

    public static class SucceedingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public final List<String> processedTasks;

        public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
            this.processedTasks = new ArrayList<String>();
        }

        public void start() {
            LOG.info("SucceedingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("SucceedingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("SucceedingWorker.processTask(" + task + ")");
            try {
                TextMessage taskText = (TextMessage) task;
                processedTasks.add(taskText.getText());
            } catch (JMSException e) {
                LOG.log(Level.SEVERE, "Unexpected exception during task processing");
            }
        }
    }

}

TaskListener.java

public class TaskListener implements MessageListener {

    private TaskProcessor processor;

    @Override
    public void onMessage(Message message) {
        processor.processTask(message);
    }

    public void setProcessor(TaskProcessor processor) {
        this.processor = processor;
    }

}

MockTaskProducer.java

@Configurable
public class MockTaskProducer implements ApplicationContextAware {
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());

    @Autowired
    private JmsTemplate jmsTemplate;

    private Destination destination;

    private int taskCounter = 0;

    public void produceTask() {
        LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");

        taskCounter++;

        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage("task-" + taskCounter);
                return message;
            }
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        destination = applicationContext.getBean("tasksQueue", Destination.class);
    }
}
4

1 に答える 1

8

どうやら私が昨日探していたドキュメントのソースは、ロバストなJMSアプリケーションの作成はある意味で私を誤解させました(または私はそれを間違って理解したかもしれません)。特にその抜粋:

JMSメッセージが確認されるまで、正常に消費されたとは見なされません。メッセージの正常な消費は通常、3つの段階で行われます。

  1. クライアントはメッセージを受信します。
  2. クライアントはメッセージを処理します。
  3. メッセージは確認されます。確認応答は、セッション確認応答モードに応じて、JMSプロバイダーまたはクライアントのいずれかによって開始されます。

AUTO_ACKNOWLEDGEがまさにそれを行うと仮定しました-リスナーメソッドが結果を返した後にメッセージを確認しました。ただし、JMS仕様によると、これは少し異なり、Springリスナーコンテナは予想どおり、JMS仕様からの動作を変更しようとはしません。これは、AbstractMessageListenerContainerのjavadocが言っていることです-私は重要な文を強調しました:

リスナーコンテナは、次のメッセージ確認オプションを提供します。

  • 「sessionAcknowledgeMode」を「AUTO_ACKNOWLEDGE」に設定(デフォルト):リスナー実行前の自動メッセージ確認応答。例外がスローされた場合の再配信はありません。
  • 「sessionAcknowledgeMode」を「CLIENT_ACKNOWLEDGE」に設定:リスナーの実行が成功した後のメッセージの自動確認。例外がスローされた場合の再配信はありません。
  • 「sessionAcknowledgeMode」を「DUPS_OK_ACKNOWLEDGE」に設定:リスナーの実行中または実行後の遅延メッセージ確認。例外がスローされた場合の再配信の可能性。
  • 「sessionTransacted」を「true」に設定:リスナーの実行が成功した後のトランザクション確認。例外がスローされた場合の再配信が保証されます。

だから私の解決策の鍵はlistenerContainer.setSessionTransacted(true);

私が直面したもう1つの問題は、JMSプロバイダーが、メッセージの処理中に失敗した同じコンシューマーに失敗したメッセージを再配信し続けることでした。JMS仕様がそのような状況でプロバイダーが何をすべきかを規定しているかどうかはわかりませんが、私にとってうまくいったlistenerContainer.shutdown();のは、障害のあるコンシューマーを切断し、プロバイダーがメッセージを再配信して、別の消費者。

于 2012-03-27T08:51:32.217 に答える