定義済みの IntegrationFlow で Spring Integration Java DSL を使用しています。応答にデータの一部が欠落しており、アグリゲーター応答のcorrelationId が、呼び出し元のサービスによって受信された応答の値と一致しないという動作が見られます。
バックグラウンド:
ランダム データを使用し、毎分 600 リクエストで実行されているサーバーで JMeter パフォーマンス テストを実行しています。私のラップトップでは、同じサーバーにヒットする SoapUI パフォーマンス テストを実行しています。SoapUI プロジェクトは、1 分あたり 60 リクエストの割合で、同じ検索基準 (マッチングを行っています) を持つリクエストを送信します。応答にはすべて同じ結果データが含まれている必要があります。
約 0.5% の確率で、データが欠落した状態で応答が返されます。これらの応答では、アグリゲーターからログに記録された応答の correlationId と、呼び出し元のサービスからログに記録された応答 (応答が呼び出し元のサービスに返され、既にアグリゲーターを通過した後にログに記録された) のcorrelationId が一致しません。
何が間違っているのですか?以下のコード スニペットを参照してください。
@Configuration
@EnableAutoConfiguration
@Import(.....AServiceConfig.class)
public class ServiceConfig {
@Bean(name = "inputChannel")
public DirectChannel inputChannel() {
return new DirectChannel();
}
@Bean(name = "outputChannel")
public QueueChannel outputChannel() {
return new QueueChannel();
}
@Bean(name = "transactionLogger")
public ourLogger ourTransactionLogger() {
return OurLoggerFactory.getLogger("ourAppTrx", new ourLoggerConfig(ourTransactionLoggerKey.values()));
}
public IntegrationFlow ourFlow() {
return IntegrationFlows.from(inputChannel())
.split(splitter(ourTransactionLogger()))
.channel(MessageChannels.executor(getExecutor()))
.handle(ourServiceActivator, "service")
.aggregate(t -> t.processor(ourAggregator, AGGREGATE))
.channel(outputChannel())
.get();
}
@Bean(name = "executor")
public Executor getExecutor()
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
//snippet from calling service
public InquiryResponse inquire(InquiryRequest request) {
inputChannel.send(MessageBuilder.withPayload(request).build());
Message<?> msgResponse = outputChannel.receive();
InquiryResponse response = (InquiryResponse) msgResponse.getPayload();
TransactionLogger.debug("correlationId + msgResponse.getHeaders().get("correlationId"));
TransactionLogger.debug("InquiryService inquire response = " + response.toString());
return response;
}
//snippet from aggregator
@Aggregator
public <T> InquiryResponse aggregate(List<Message> serviceResponses) {
InquiryResponse response = new InquiryResponse();
serviceResponses.forEach(serviceResponse -> {
Object payload = serviceResponse.getPayload();
if (payload instanceof AMatchResponse) {
response.setA(((AMatchResponse) payload).getA());
} else if (payload instanceof BValueResponse) {
response.setB(((BValueResponse) payload).getB());
} else if (payload instanceof BError) {
response.setB(new B().addBErrorsItem((BError) payload));
} else if (payload instanceof AError) {
response.setA(new A().AError((AError) payload));
} else {
transactionLogger.warn("Unknown message type received. This message will not be aggregated into the response. ||| model=" + payload.getClass().getName());
}
});
transactionLogger.debug("OurAggregator.response = " + response.toString());
return response;
}