2

キュー要素ごとに http リクエストを実行したい。これらのリクエストは並行して呼び出す必要があります。
また、すべてのリクエストの終了を待つ必要があります。

次のコードを開発しました。

 List<Mono<MyResponseDTO>> monoList = queue.stream()
                .map(jobStatusBunch -> webClient
                        .post()
                        .uri("localhost:8080/api/some/url")
                        .bodyValue(convertToRequestDto(someBean))
                        .retrieve()
                        .toEntity(String.class)
                        .filter(HttpEntity::hasBody)
                        .map(stringResponseEntity -> {
                            try {
                                return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
                            } catch (JsonProcessingException e) {
                                log.error("Can't parse", e);
                                return null;
                            }
                        })
                        .doOnNext(myResponseDTO -> {
                            log.info("doOnNext is invoked");
                        })
                ).collect(Collectors.toList());
          //await when all MONOs are completed

log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
        .flatMap(Function.identity())
        .then();
log.info("Finished waiting for {}", monoList);

キューに単一の要素がある場合、次のログが表示されます。

2019-11-19 19:17:17.733  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [   scheduling-1] o.s.w.r.f.client.ExchangeFunctions       : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230  INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService     : doOnNext is invoked

したがって、このコードではリクエストの終了を待つことはできません。

どうすればそれを達成できますか?

PS

Flux.merge(monoList).blockLast()私が必要とするのはsmthのようです。それは正しく動作しますか?

4

2 に答える 2