さまざまなバックプレッシャーのシナリオで遊んでいるときに、あるサブスクライバーがバッファーで遅く、別のサブスクライバーがスローされたものをすべて消費するケースを実装しました。それは Scala と Akka Streams を使用していました。必要に応じてここでコードを確認し、それを実行するテストをここで確認できます。
私は通常、比較のためにRxJavaバージョンを開発しようとしますが、これで行き詰まりました。Akka Streams では、2 つのチャネルでブロードキャストする 1 つのソースを使用してグラフを作成し、それらのチャネルから低速シンクと高速シンクを消費させることができます。各チャネルは、バッファリングとスロットリングを個別に適用できます。RxJava にはshare
ブロードキャスト用のオペレーターがありますが、バッファリングとスロットリングのロジックはSubscriber
ではなく にありObservable
ます。したがって、バッファリングとスロットリングを適用し、両方のサブスクライバーに影響を与えないようにする方法がわかりません。Akka Streams と RxJava の両方が Rx の実装であるため、必要なものを取得する方法があることを願っています。
ここに私がやろうとしていることの絵のバージョンがあります。