問題タブ [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
2022 参照

java - Reactor Publisher の適切な使用方法

Reactor を使用してパブリッシャー/サブスクライバーのシナリオを適切に実装する方法がわかりません。私は実用的な解決策を持っていますが、実装は私には正しくないようです:

私の問題は、サブスクライバーを登録してイベントを渡すためにパブリッシャーを手動で実装する必要があることです。

次に、WorkQueue Processor (コンシューマーである必要があります) があります。

それは正常に動作しますが、地獄のように醜いです。Spring ガイドから抜粋したこの例では、EventBus を使用してパブリッシャーからコンシューマーにイベントをルーティングしますが、それを自分のプロセッサにリンクしようとすると、次のコンパイラ エラーが発生します。

パブリッシャーとプロセッサーをリンクさせる最善の方法は何ですか? EventBus の使用をお勧めしますか? もしそうなら、適切な呼び出しは何ですか?

ありがとう!

0 投票する
1 に答える
487 参照

java - リアクターを使用してコレクションを同時に消費する

時々、私が関与しているプロジェクト全体で従来の同時生産者と消費者のソリューションを実装する必要がありますが、複数のスレッドから取り込まれ、複数の消費者によって消費されるコレクションを持つことで、ほとんどの問題が軽減されます。一言で言えば、コレクションは 10,000 個のエンティティにバインドされていると言います。バッファ サイズに達すると、これらの 10,000 個のエンティティを消費するワーカー タスクが送信されます。そのようなワーカーの制限があり、そのセットは 10 に設定されています。それぞれが 10,000 エンティティを消費する 10 個のワーカーまで。

ここでいくつかのロックをいじる必要があり、バッファ オーバーフローに関するいくつかのチェック (すべてのワーカーがチャンクの処理でビジー状態である間にプロデューサーが大量のデータを生成する場合) をチェックする必要があるため、OOM を回避するために新しいイベントを破棄する必要があります (最善の解決策ではありませんが、安定性はp1 ;))

最近、reactor の周りと、低レベルに行く代わりにそれを使用して上記のすべてのことを行う方法を探していたので、愚かな質問は次のとおりです。今のところ、オーバーフロー/破棄については忘れてください..ブロードキャスターのN個のコンシューマーをどのように達成できますか?

特に、バッファ + スレッド プール ディスパッチャを使用してブロードキャスタを調べていました。

ここでの私の意図は、ブロードキャスターがバッファサイズに達したときに非同期で log(..) を実行することですが、常にブロックモードで log(...) を実行しているようです。100 を実行したら、次の 100 などを実行します。どうすれば非同期にできますか?

ありがとう

0 投票する
1 に答える
3735 参照

java - プロジェクト原子炉のフラックスの同時処理

私はリアクターまたはリアクティブプログラミング全体をプロジェクトするのに非常に慣れていないので、おそらく何か間違ったことをしています。次のことを行うフローを構築するのに苦労しています。

クラスエンティティが与えられた場合:

  1. DB からエンティティを読み取る ( ListenableFuture<Entity> readEntity())
  2. すべてのアイテムに対していくつかの並列非同期処理を実行します ( boolean processItem(Map.Entry<String, String> item))
  3. すべてが完了したら doneProcessing ( void doneProcessing(boolean b))を呼び出します

現在、私のコードは次のとおりです。

動作しますが、handler::processItem呼び出しはすべてのアイテムで同時に実行されません。and とandの両方をさまざまなパラメーターとともに使用dispatchOnしてみましたが、呼び出しは 1 つのスレッドで連続して実行されます。私は何を間違っていますか?publishOnioasync SchedulerGroup

それとは別に、一般的に上記は改善できると確信しているので、提案をいただければ幸いです。

ありがとう

0 投票する
5 に答える
25942 参照

java - Flux.map() 実行時のエラー処理方法

Flux 内の要素をマッピングするときにエラーを処理する方法を理解しようとしています。

たとえば、CSV 文字列をビジネス POJO の 1 つに解析しています。

この行の一部にエラーが含まれている可能性があるため、ログには次のように表示されます。

API でいくつかのエラー処理メソッドを読みましたが、ほとんどは「エラー値」を返すか、次のようなフォールバック Flux を使用することを参照していました。

ただし、これを my で使用myfluxすると、フラックス全体が再度処理されます。

では、特定の要素の処理中にエラーを処理し (つまり、それらを無視する/ログに記録する)、残りのフラックスの処理を続ける方法はありますか?

@akarnokd 回避策で更新

ただし、コードが以前よりも洗練されていないことがわかります。Flux API には、このコードが行うことを行うメソッドがありませんか?

0 投票する
2 に答える
1345 参照

java - reactor-core - java.lang.IllegalStateException: キューがいっぱい?! ホットパブリッシャー (ConnectableFlux) で

これまで RxJava を使っていましたが、reactive stream 仕様に準拠しているため、projectreactor.io の reactor-core をいじり始めています。

次のテストでは、乱数を生成するホット フラックス (ConnectableFlux) を作成します。すぐに connect() すると、256 個の値がプリフェッチされます (実際には 258 個がログに表示されます)。サブスクライバーがしばらくしてからサブスクライブしないことをシミュレートするために、5 秒間待機します。

メイン スレッドが起動した後、RnApp は ConnectableFlux をサブスクライブしrandomNumberGenerator.subscribe(new RnApp());ます。次にRnApp.onSubscribe()が呼び出され、10 個の要素が要求されます。その後、java.lang.IllegalStateException: Queue full?!例外が発生します (RnApp.onError()呼び出されます)、なぜですか?

加入者:

パブリッシャーテスト:

ログ:

0 投票する
0 に答える
282 参照

java - スプリングリアクターでのタスクの調整と同期

スプリング リアクター フレームワークを使用しており、作業はタスクの観点から実行されます。1 つのメイン タスクがあり、複数の子タスクが作成されます。すべての子の実行が完了すると、メイン タスクを使用してイベントを生成する必要があります。私はJavaの同時実行構造について認識しており、カウントダウンラッチなどを使用して実現できます。しかし、リアクターフレームワークでは、ストリームとプロミスを使用してタスクの調整を行う必要があることを読みました。

スプリングリアクターでのタスク調整に関する詳細情報を提供する便利なリンクと例が必要です。

0 投票する
0 に答える
494 参照

spring - リアクターの FluxProcessor.wrap(上流、下流)を理解する

プロセッサー (RxJava のサブジェクト) は、パブリッシャーとサブスクライバーの両方として機能するため、パブリッシャーにサブスクライブでき、さらにサブスクライブして、最上位のサブスクライバーから取得した値を渡すことができます。

FluxProcessor.wrap()はこのスキーマにどのように適合しますか? たとえば、 a から値を取得し、サブスクライブして値を取得できるFluxProcessorwithを作成したいと思います。FluxProcessor.wrapFlux.range()