0

スプリング ブート 2.0.0.BUILD_SNAPSHOT とスプリング ブート webflux 5.0.0 を使用していますが、現在、要求に応じてフラックスをクライアントに転送することはできません。

現在、イテレータからフラックスを作成しています:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }
    });
}

そして、リクエストに応じて、私は単純にやっています:

@RequestMapping(value="/all", method=RequestMethod.GET, produces="application/json")
public Flux<ItemIgnite> getAllFlux() {
    return this.provider.getAllFlux();
}

localhost:8080/all10 秒後にローカルで呼び出すと、503ステータス コードが表示されます。/allまた、次を使用してリクエストしたときのクライアントと同様にWebClient

public Flux<ItemIgnite> getAllPoducts(){
    WebClient webClient = WebClient.create("http://localhost:8080");

    Flux<ItemIgnite> f = webClient.get().uri("/all").accept(MediaType.ALL).exchange().flatMapMany(cr -> cr.bodyToFlux(ItemIgnite.class));
    f.subscribe(System.out::println);
    return f;

}

何も起こりません。データは転送されません。

代わりに次のことを行うと:

public Flux<List<ItemIgnite>> getAllFluxMono() {
    return Flux.just(this.getAllList());
}

@RequestMapping(value="/allMono", method=RequestMethod.GET, produces="application/json")
public Flux<List<ItemIgnite>> getAllFluxMono() {
    return this.provider.getAllFluxMono();
}

それは働いています。通常、フラックスを使用せずにデータを転送するため、すべてのデータのロードがすでに完了し、クライアントに転送されたばかりだからだと思います。

それらのデータを要求する Web クライアントにデータをストリーミングするフラックスを取得するには、何を変更する必要がありますか?

編集

Ignite cache内にデータがあります。だから私getAllIteratorはigniteキャッシュからデータをロードしています:

public Iterator<Cache.Entry<String, ItemIgnite>> getAllIterator() {
    return this.igniteCache.iterator();
}

編集

flux.complete()@Simon Basléが提案したように追加:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete(); // see here
    });
}

ブラウザの503問題を解決します。しかし、それは の問題を解決しませんWebClient。データはまだ転送されていません。

編集3

publishOnで使用Schedulers.parallel():

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.<ItemIgnite>create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete();
    }).publishOn(Schedulers.parallel());
}

結果は変わりません。

ここで、WebClient が受け取るものを投稿します。

value :[Item ID: null, Product Name: null, Product Group: null]
complete

したがって、彼は 1 つのアイテム (35.000 以上のうち) を取得しているように見え、値は null であり、彼は後で終了しています。

4

2 に答える 2