複数のCompeting Consumersによって同時に処理される必要があるRequest/Reply交換の実装に苦労しています。
タスクのキューの生成を担当するスタンドアロンのマスターモジュールが 1 つあります。そして、そのキューからのメッセージを同時に消費するべき多くのWorkerモジュールがあります。
これはCamel ルーティングのMaster部分です。
from("direct:start")
.to("log:FROM.DIRECT?level=DEBUG")
.split(body()).setHeader(CamelHeader.TASKS_BATCH_ID, simple("BATCH-1"))
.setHeader(CamelHeader.TASK_TYPE, simple(TaskType.FETCH_INDEX))
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
EdgarFullIndexLocation location =
exchange.getIn().getBody(EdgarFullIndexLocation.class);
exchange.getIn().setBody(location.getId().toJson(), String.class);
}
})
.to("log:SPLIT?level=DEBUG")
.setExchangePattern(ExchangePattern.InOut)
.to("activemq:queue:tasksQueue?replyTo=completionsQueue" +
//"&transactedInOut=true" +
"&requestTimeout=" + Integer.MAX_VALUE +
"&disableTimeToLive=true")
.threads(10)
.to("log:RESPONSE?level=DEBUG")
.routeId(routeId);
これは、Camel ルーティングのWorker部分で、ここでキューを使用します。
from("activemq:queue:tasksQueue?asyncConsumer=true" +
"&concurrentConsumers=10")
.to("log:FROM.TASKS.QUEUE?level=DEBUG")
.choice()
.when(header(CamelHeader.TASK_TYPE).isEqualTo(TaskType.FETCH_INDEX))
.process(new FetchIndexTaskProcessor())
.otherwise()
.to("log:UNKNOWN.TASK?level=DEBUG");
ここでFetchIndexTaskProcessorは AsyncProcessorを実装します:
public class FetchIndexTaskProcessor implements AsyncProcessor {
@Override public void process(Exchange exchange) throws Exception {}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
FetchIndexTask task = new FetchIndexTask(exchange, callback);
task.start();
return false;
}
}
ここでFetchIndexTaskはThreadを拡張します。start()の後、新しいスレッドは以下を担当します。
- ルートを動的に追加します。
- そのルートの交換が完了するまでブロックします。
- 元の交換への返信を準備しています。
- 最後に電話
callback.done(false);
。
競合するコンシューマーを持つという部分を除いて、すべてが機能します- それは常に一度に 1 つのコンシューマーです。
次のような多くのオプションを試しました。
- さまざまな場所でスレッドプールを指定します
.threads(10)
。 asyncConsumer
やなどのエンドポイント オプションを使用するconcurrentConsumers
しかし、何か重要なものが欠けているようで、同時に機能させることができないようです。それを行う適切な方法は何ですか?