0

私は、reactor を使用してアプリケーション全体にイベントを発行し、さまざまなコンシューマーにイベントに応答させています。

これが私のリアクター構成です

@Configuration
@EnableReactor
public class ReactorConfiguration {

    static {
       Environment.initializeIfEmpty().assignErrorJournal();
    }

    @Bean
    public EventBus eventBus() {
       return EventBus.config().env(Environment.get()).dispatcher(Environment.SHARED).get();
}

デフォルトのリング バッファ ベースのディスパッチャが使用され、単一のコンシューマに送信された複数のメッセージが並行して処理されることを期待しています。代わりに、イベントを同期的に処理しているようです。スレッドshared-1は、イベント 1 をコンシューマー 1 に処理するために使用され、イベント 1 の処理が完了した後でのみ、同じスレッドがコンシューマー 1 でイベント 2 の処理を​​開始します。

複数のイベントを複数のコンシューマーに送信し、すべてのイベントを並行して処理できるように、並列処理を実現するにはどうすればよいですか。

提案をいただければ幸いです。

これは、イベントバスにイベントをディスパッチする方法です

dispatch(ReactorEvents.REPORT_REQUEST_EVENT, "", event);

protected <T> void dispatch(String selector, String info, T event) {
    eventBus.notify(selector, Event.wrap(Tuple.of(info, event)));
}

これが消費者の1人です

@Consumer
public class ReportRequestHandler {
...
  @Selector(ReactorEvents.REPORT_REQUEST_EVENT)
  @Override
  public void handleRequest(Tuple2<String, ReportRequestEvent> tuple) {
    ReportRequestEvent event = tuple.getT2();
    log.debug("processing report request " + event.getId());
    ....
  }
}
4

1 に答える 1