私は、reactor を使用してアプリケーション全体にイベントを発行し、さまざまなコンシューマーにイベントに応答させています。
これが私のリアクター構成です
@Configuration
@EnableReactor
public class ReactorConfiguration {
static {
Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
public EventBus eventBus() {
return EventBus.config().env(Environment.get()).dispatcher(Environment.SHARED).get();
}
デフォルトのリング バッファ ベースのディスパッチャが使用され、単一のコンシューマに送信された複数のメッセージが並行して処理されることを期待しています。代わりに、イベントを同期的に処理しているようです。スレッドshared-1
は、イベント 1 をコンシューマー 1 に処理するために使用され、イベント 1 の処理が完了した後でのみ、同じスレッドがコンシューマー 1 でイベント 2 の処理を開始します。
複数のイベントを複数のコンシューマーに送信し、すべてのイベントを並行して処理できるように、並列処理を実現するにはどうすればよいですか。
提案をいただければ幸いです。
これは、イベントバスにイベントをディスパッチする方法です
dispatch(ReactorEvents.REPORT_REQUEST_EVENT, "", event);
protected <T> void dispatch(String selector, String info, T event) {
eventBus.notify(selector, Event.wrap(Tuple.of(info, event)));
}
これが消費者の1人です
@Consumer
public class ReportRequestHandler {
...
@Selector(ReactorEvents.REPORT_REQUEST_EVENT)
@Override
public void handleRequest(Tuple2<String, ReportRequestEvent> tuple) {
ReportRequestEvent event = tuple.getT2();
log.debug("processing report request " + event.getId());
....
}
}