4

私はリアクターまたはリアクティブプログラミング全体をプロジェクトするのに非常に慣れていないので、おそらく何か間違ったことをしています。次のことを行うフローを構築するのに苦労しています。

クラスエンティティが与えられた場合:

Entity {
    private Map<String, String> items;
    public Map<String, String> getItems() {
        return items;
    }
}
  1. DB からエンティティを読み取る ( ListenableFuture<Entity> readEntity())
  2. すべてのアイテムに対していくつかの並列非同期処理を実行します ( boolean processItem(Map.Entry<String, String> item))
  3. すべてが完了したら doneProcessing ( void doneProcessing(boolean b))を呼び出します

現在、私のコードは次のとおりです。

handler = this;
Mono
    .fromFuture(readEntity())
    .doOnError(t -> {
        notifyError(“some err-msg” , t);
        return;
    })
    .doOnSuccess(e -> log.info("Got the Entity: " + e))
    .flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
    .all(handler::processItem)
    .consume(handler::doneProcessing);

動作しますが、handler::processItem呼び出しはすべてのアイテムで同時に実行されません。and とandの両方をさまざまなパラメーターとともに使用dispatchOnしてみましたが、呼び出しは 1 つのスレッドで連続して実行されます。私は何を間違っていますか?publishOnioasync SchedulerGroup

それとは別に、一般的に上記は改善できると確信しているので、提案をいただければ幸いです。

ありがとう

4

1 に答える 1

4

個々のマップ要素ごとに計算をフォークして結合する別の flatMap が必要です。

Mono.fromFuture(readEntity())
.flatMap(v -> Flux.fromIterable(v.getItems().entrySet()))
.flatMap(v -> Flux.just(v)
                .publishOn(SchedulerGroup.io())
                .doOnNext(handler::processItem))
.consume(handler::doneProcessing);
于 2016-03-30T12:59:08.087 に答える