0

私の API は、2 つの別個のサービスに対してペアで約 100 のダウンストリーム呼び出しを行います。クライアントに応答を返す前に、すべての応答を集約する必要があります。hystrix-feign を使用して HTTP 呼び出しを行います。

rxJavaドキュメントで次のことを見つけるまで、エレガントなソリューションだと信じていたものを思いつきました

BlockingObservable は、ブロッキング オペレータを提供するさまざまな Observable です。これはテストやデモの目的には役立ちますが、通常、実稼働アプリケーションには適していません (BlockingObservable を使用する必要があると思われる場合、これは通常、設計を再考する必要があることを示しています)。

私のコードはおおよそ次のようになります

List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
    Observable<C> zipped = Observable.zip(
         feignClientA.sendRequest(request.A()),
         feignClientB.sendRequest(request.B()),
         (a, b) -> new C(a,b));
    observables.add(zipped);
}

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();

Observable
    .merge(observables)
    .toBlocking()
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));

return apiResponse;

この設定に基づくいくつかの質問:

  1. 私のユースケースを考えると toBlocking() は正当化されますか
  2. メインスレッドが forEach() に到達するまで、実際の HTTP 呼び出しは行われないことを理解しているのは正しいですか?
  3. forEach() ブロック内のコードが異なるスレッドによって実行されることは確認しましたが、forEach() ブロック内に複数のスレッドが存在する可能性があるかどうかは確認できませんでした。同時実行はありますか?
4

1 に答える 1

1

より良いオプションは、他のオペレーターによって消費されるように を返すことObservableですが、コードをブロックすることを回避できます (ただし、バックグラウンド スレッドで実行する必要があります)。

public Observable<D> getAll(Iterable<RequestPair> requests) {
    return Observable.from(requests)
    .flatMap(request ->
        Observable.zip(
            feignClientA.sendRequest(request.A()),
            feignClientB.sendRequest(request.B()),
            (a, b) -> new C(a,b)
        )
    , 8)  // maximum concurrent HTTP requests
    .map(both -> doSomeWork(both));
}

// for legacy users of the API
public Collection<D> getAllBlocking(Iterable<RequestPair> requests) {
    return getAll(requests)
        .toList()
        .toBlocking()
        .first();
}

メインスレッドが forEach() に到達するまで、実際の HTTP 呼び出しが行われないことを理解しているのは正しいですか?

はい、forEach一連の操作全体をトリガーします。

forEach() ブロック内のコードが異なるスレッドによって実行されることは確認しましたが、forEach() ブロック内に複数のスレッドが存在する可能性があるかどうかは確認できませんでした。同時実行はありますか?

一度に 1 つのスレッドのみがラムダを実行できforEachますが、実際には別のスレッドがそこに入ることがあります。

于 2016-11-23T09:07:25.677 に答える