4

複数のCompeting Consumersによって同時に処理される必要があるRequest/Reply交換の実装に苦労しています。

タスクのキューの生成を担当するスタンドアロンのマスターモジュールが 1 つあります。そして、そのキューからのメッセージを同時に消費するべき多くのWorkerモジュールがあります。

これはCamel ルーティングのMaster部分です。

from("direct:start")
.to("log:FROM.DIRECT?level=DEBUG")
.split(body()).setHeader(CamelHeader.TASKS_BATCH_ID, simple("BATCH-1"))
.setHeader(CamelHeader.TASK_TYPE, simple(TaskType.FETCH_INDEX))
.process(new Processor() {
    @Override
    public void process(Exchange exchange) throws Exception {
        EdgarFullIndexLocation location = 
            exchange.getIn().getBody(EdgarFullIndexLocation.class);
        exchange.getIn().setBody(location.getId().toJson(), String.class);
    }
})
.to("log:SPLIT?level=DEBUG")
.setExchangePattern(ExchangePattern.InOut)
.to("activemq:queue:tasksQueue?replyTo=completionsQueue" +
    //"&transactedInOut=true" + 
    "&requestTimeout=" + Integer.MAX_VALUE +
    "&disableTimeToLive=true")
.threads(10)
.to("log:RESPONSE?level=DEBUG")
.routeId(routeId);

これは、Camel ルーティングのWorker部分で、ここでキューを使用します。

from("activemq:queue:tasksQueue?asyncConsumer=true" + 
    "&concurrentConsumers=10")
.to("log:FROM.TASKS.QUEUE?level=DEBUG")
.choice()
    .when(header(CamelHeader.TASK_TYPE).isEqualTo(TaskType.FETCH_INDEX))
        .process(new FetchIndexTaskProcessor())
    .otherwise()
        .to("log:UNKNOWN.TASK?level=DEBUG");

ここでFetchIndexTaskProcessorは AsyncProcessorを実装します:

public class FetchIndexTaskProcessor implements AsyncProcessor {
    @Override public void process(Exchange exchange) throws Exception {}

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        FetchIndexTask task = new FetchIndexTask(exchange, callback);
        task.start();
        return false;
    }

}

ここでFetchIndexTaskThreadを拡張します。start()の後、新しいスレッドは以下を担当します。

  1. ルートを動的に追加します。
  2. そのルートの交換が完了するまでブロックします。
  3. 元の交換への返信を準備しています。
  4. 最後に電話callback.done(false);

競合するコンシューマーを持つという部分を除いて、すべてが機能します- それは常に一度に 1 つのコンシューマーです。

次のような多くのオプションを試しました。

  • さまざまな場所でスレッドプールを指定します.threads(10)
  • asyncConsumerやなどのエンドポイント オプションを使用するconcurrentConsumers

しかし、何か重要なものが欠けているようで、同時に機能させることができないようです。それを行う適切な方法は何ですか?

4

1 に答える 1

2

Camel 2.9 以降を使用している場合は、要求/応答を行う activemq エンドポイントで replyToType=Exclusive を使用することをお勧めします。これは、キューが排他的であることを Camel に伝え、予想される相関メッセージをピックアップするために JMS メッセージセレクターが必要ないため、高速化します。

Camel JMS ドキュメントのセクションRequest-reply over JMS以降を参照してください: http://camel.apache.org/jms

一時キューを使用する場合も、JMS メッセージ セレクターが必要ないため高速です。

また、ルートは直接エンドポイントから始まります。これは同期呼び出しであるため、呼び出し元は Exchange が完全に完了するまで待機/ブロックします。

また、スプリッター EIP は、並行処理を使用する並列モードで実行するように構成できます。また、分割するメッセージが大きい場合は、メッセージの内容全体をメモリにロードするのではなく、オンデマンドでメッセージを分割するストリーミングの使用を検討してください。

とにかく、ルートには多くのことが起こっています。どこに問題があるかをより正確に特定できますか? お手伝いしやすくなります。

于 2012-04-11T04:47:48.430 に答える