私の API は、2 つの別個のサービスに対してペアで約 100 のダウンストリーム呼び出しを行います。クライアントに応答を返す前に、すべての応答を集約する必要があります。hystrix-feign を使用して HTTP 呼び出しを行います。
rxJavaドキュメントで次のことを見つけるまで、エレガントなソリューションだと信じていたものを思いつきました
BlockingObservable は、ブロッキング オペレータを提供するさまざまな Observable です。これはテストやデモの目的には役立ちますが、通常、実稼働アプリケーションには適していません (BlockingObservable を使用する必要があると思われる場合、これは通常、設計を再考する必要があることを示しています)。
私のコードはおおよそ次のようになります
List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
Observable<C> zipped = Observable.zip(
feignClientA.sendRequest(request.A()),
feignClientB.sendRequest(request.B()),
(a, b) -> new C(a,b));
observables.add(zipped);
}
Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();
Observable
.merge(observables)
.toBlocking()
.forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));
return apiResponse;
この設定に基づくいくつかの質問:
- 私のユースケースを考えると toBlocking() は正当化されますか
- メインスレッドが forEach() に到達するまで、実際の HTTP 呼び出しは行われないことを理解しているのは正しいですか?
- forEach() ブロック内のコードが異なるスレッドによって実行されることは確認しましたが、forEach() ブロック内に複数のスレッドが存在する可能性があるかどうかは確認できませんでした。同時実行はありますか?