1

リモートでSpring Batchを使用してバッチジョブを実行するコンポーネントをセットアップする状況があります。ジョブの xml パス、名前、パラメーターなどを含む JMS メッセージを送信し、呼び出し元のバッチ クライアントでサーバーからの応答を待ちます。

サーバーはキューを読み取り、適切なメソッドを呼び出してジョブを実行し、結果を返します。これは、メッセージング フレームワークによって次のように行われます。

    this.jmsTemplate.send(queueName, messageCreator);
    this.LOGGER.debug("Message sent to '" + queueName + "'");

    try {
        final Destination replyTo = messageCreator.getReplyTo();
        final String correlationId = messageCreator.getMessageId();

        this.LOGGER.debug("Waiting for the response '" + correlationId + "' back on '" + replyTo + "' ...");
        final BytesMessage message = (BytesMessage) this.jmsTemplate.receiveSelected(replyTo, "JMSCorrelationID='"
                + correlationId + "'");
        this.LOGGER.debug("Response received");

理想的には、runJobSync メソッドを 2 回呼び出して、2 つのジョブを同時に実行できるようにしたいと考えています。ジョブなしで同様のことを行う単体テストがあります。このコードはあまり素晴らしいものではありませんが、次のとおりです。

最終リスト結果 = Collections.synchronizedList(new ArrayList());

    Thread thread1 = new Thread(new Runnable(){

        @Override
        public void run() {
            client.pingWithDelaySync(1000); 
            result.add(Thread.currentThread().getName());
        }

    }, "thread1");

    Thread thread2 = new Thread(new Runnable(){

        @Override
        public void run() {
            client.pingWithDelaySync(500);              
            result.add(Thread.currentThread().getName());
        }

    }, "thread2");

    thread1.start();
    Thread.sleep(250);
    thread2.start();

    thread1.join();
    thread2.join();

    Assert.assertEquals("both thread finished", 2, result.size());
    Assert.assertEquals("thread2 finished first", "thread2", result.get(0));
    Assert.assertEquals("thread1 finished second", "thread1", result.get(1));

そのテストを実行すると、スレッド 2 は 500 ミリ秒待機するだけなので最初に完了しますが、スレッド 1 は 1 秒待機します。

Thread.sleep(delayInMs);
    return result;

それはうまくいきます。2 つのリモート ジョブを実際に実行すると、1 つは完了するのに約 50 秒かかり、もう 1 つはすぐに失敗して戻るように設計されていますが、これは起こりません。

50 秒のジョブを開始し、すぐに即時失敗ジョブを開始します。クライアントはジョブの実行を要求するメッセージを送信したことを出力し、サーバーは 50 秒の要求を受信したことを出力しますが、ThreadPoolExecutor を使用していても、2 番目のメッセージを処理する前に 50 秒のジョブが完了するまで待機します。

自動承認でトランザクションを実行しています。

リモート デバッグを行うと、AbstractPollingMessageListenerContainer の Consumer は未​​処理のメッセージを表示しません (したがって、consumer.receive() は明らかに null を何度も返すだけです)。amq ブローカーの WebGUI には、2 つのエンキュー、1 つのデキュー、1 つのディスパッチ済み、1 つのディスパッチ済みキューが表示されます。これは、何かが AMQ が消費者に 2 番目のメッセージを「持たせる」ことを妨げていることを示唆しています。(プリフェッチは 1000 btw です) これは、特定のキューの唯一のコンシューマーとして表示されます。

私自身と他の数人の開発者は、ここ数日間、あちこち探し回りましたが、ほとんど行き場がありません。これが予期された動作である場合に誤って構成されたもの、またはここで何が壊れるかについての提案。

リモートで呼び出されているメソッドはまったく問題になりますか? 現在、ジョブ ハンドラー メソッドはエグゼキューターを使用して別のスレッドでジョブを実行し、future.get() を実行します (余分なスレッドはログに関連する理由のためです)。

どんな助けでも大歓迎です

4

1 に答える 1

0

私が完全にフォローしているかどうかはわかりませんが、上から、次のことを試してみてください...

  • MessageListenerContainer で、concurrentConsumers/maxConcurrentConsumers をデフォルト (1) より大きく設定します
  • プリフェッチを 0 に設定して、コンシューマ間のメッセージのバランスをより適切に促進するなど。
于 2012-10-18T22:38:53.567 に答える