0

時々、私が関与しているプロジェクト全体で従来の同時生産者と消費者のソリューションを実装する必要がありますが、複数のスレッドから取り込まれ、複数の消費者によって消費されるコレクションを持つことで、ほとんどの問題が軽減されます。一言で言えば、コレクションは 10,000 個のエンティティにバインドされていると言います。バッファ サイズに達すると、これらの 10,000 個のエンティティを消費するワーカー タスクが送信されます。そのようなワーカーの制限があり、そのセットは 10 に設定されています。それぞれが 10,000 エンティティを消費する 10 個のワーカーまで。

ここでいくつかのロックをいじる必要があり、バッファ オーバーフローに関するいくつかのチェック (すべてのワーカーがチャンクの処理でビジー状態である間にプロデューサーが大量のデータを生成する場合) をチェックする必要があるため、OOM を回避するために新しいイベントを破棄する必要があります (最善の解決策ではありませんが、安定性はp1 ;))

最近、reactor の周りと、低レベルに行く代わりにそれを使用して上記のすべてのことを行う方法を探していたので、愚かな質問は次のとおりです。今のところ、オーバーフロー/破棄については忘れてください..ブロードキャスターのN個のコンシューマーをどのように達成できますか?

特に、バッファ + スレッド プール ディスパッチャを使用してブロードキャスタを調べていました。

void test() {
  final Broadcaster<String> sink =   Broadcaster.create(Environment.initialize());
  Dispatcher dispatcher = Environment.newDispatcher(2048, 20, DispatcherType.WORK_QUEUE);

  sink
    .buffer(100)
    .consumeOn(dispatcher, this::log);

  for (int i=0; i<100000; i++) {
    sink.onNext("elementent " + i);
    if (i%1000 == 0) {
      System.out.println("addded elements " + i);
    }
  }
}
 void log(List<String> values) {
  System.out.print("simulating slow processing....");
  System.out.println("processing: " + Arrays.toString(values.toArray()));
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

ここでの私の意図は、ブロードキャスターがバッファサイズに達したときに非同期で log(..) を実行することですが、常にブロックモードで log(...) を実行しているようです。100 を実行したら、次の 100 などを実行します。どうすれば非同期にできますか?

ありがとう

4

1 に答える 1

0

可能なパターンは、publishOn で flatMap を使用することです。

Flux.range(1, 1_000_000)
.buffer(100)
.flatMap(b -> Flux.just(b).publishOn(SchedulerGroup.io())
   .doOnNext(this::log))
.consume(...);
于 2016-03-30T13:14:36.153 に答える