私はリアクターまたはリアクティブプログラミング全体をプロジェクトするのに非常に慣れていないので、おそらく何か間違ったことをしています。次のことを行うフローを構築するのに苦労しています。
クラスエンティティが与えられた場合:
Entity {
private Map<String, String> items;
public Map<String, String> getItems() {
return items;
}
}
- DB からエンティティを読み取る (
ListenableFuture<Entity> readEntity()
) - すべてのアイテムに対していくつかの並列非同期処理を実行します (
boolean processItem(Map.Entry<String, String> item)
) - すべてが完了したら doneProcessing (
void doneProcessing(boolean b)
)を呼び出します
現在、私のコードは次のとおりです。
handler = this;
Mono
.fromFuture(readEntity())
.doOnError(t -> {
notifyError(“some err-msg” , t);
return;
})
.doOnSuccess(e -> log.info("Got the Entity: " + e))
.flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
.all(handler::processItem)
.consume(handler::doneProcessing);
動作しますが、handler::processItem
呼び出しはすべてのアイテムで同時に実行されません。and とandの両方をさまざまなパラメーターとともに使用dispatchOn
してみましたが、呼び出しは 1 つのスレッドで連続して実行されます。私は何を間違っていますか?publishOn
io
async
SchedulerGroup
それとは別に、一般的に上記は改善できると確信しているので、提案をいただければ幸いです。
ありがとう