問題タブ [backpressure]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - オブザーバブルの要素を正しい(リアルタイム)時間で渡す方法は?
いくつかのルートのターンポイントを示す2Dポイントを放出するaobservableがあるとします
これは、ロボットが origin から開始し(0,0)
、次に長方形の左下隅、次に右下隅に移動し、その後元に戻り、最後に原点に戻ることを意味します。
ここで、ロボットが一定の速度、たとえば 1 秒あたり 1 単位で動いているかのように、そのポイントにフィードするオブザーバブルが必要だとします。
そのため、最初のオペレーターはそれを受信(0,0)
し、そのまま再送信します。
(0, 10)
次に、このポイントまでの距離が単位であり、そのポイントに到達するのに数秒10
かかることを受信して確認します。10
したがって、新しいオブザーバブルは発行する必要があります
最後の受信ポイントに到達して再送信されるまで、毎秒 1 ペア。次に、オペレーターは次のポイントを取り、それで同じことを行う必要があります。
ReactiveX でこれを達成するにはどうすればよいですか?
ここで「バックプレッシャー」を実装して、結果として発生するすべてのものが時間内に再送信されるまで、観測可能なソースを遅くする必要があると思いますか?
アップデート
以下のコードを書きましたが、動作します。2 つflatMap
の s を使用します。1 つは追加のポイントを提供し、各ポイントにタイムスタンプ (秒単位) を付け、もう 1 つはflatMap
タイムスタンプに応じた遅延を導入します。
コードは機能しますが、複雑に見えます。問題は解決しません: ReactiveX ライブラリにこれらの種類の既製のツールが含まれているかどうか:
rx-java - 観察可能なサポートの反応的なプル
私はかなり基本的な質問であると信じていることにしばらく苦労してきました。
Flowable
ネットワークからアイテムのバンドルを取得してそれらを発行する があります。
リクエストごとに動作する必要があります。つまり、必要に応じSubscription
て呼び出しますsubscription.request(n)
。
Backpressureに関する記事(「リアクティブ プル」セクション)Observer
では、 の視点が説明されており、簡単に言及されています。
ただし、[リアクティブ プル] が機能するには、オブザーバブル A と B が request() に正しく応答する必要があります <...> そのようなサポートは、オブザーバブルの要件ではありません
このようなサポートが原則としてどのように達成されるかについては、詳細には触れません。
Observable
そのような/を実装するための一般的なパターンはありFlowable
ますか? 良い例や参照が見つかりませんでした。私が考えることができる1つの悪いオプションは、while
ループ内emitter.requested() == 0
でロックしdoOnRequest()
、別のスレッドからロックを解除するモニターを持つことです。しかし、このアプローチは面倒に感じます。
では、この「スローダウン」は一般的にどのように達成されるのでしょうか?
ありがとう。
java - RxJava2 観測可能な背圧
RxJava2
最近、背圧がどのように機能するかを理解していないことに気付きました。
私は小さなテストを作成しましたが、MissingBackpressureException
例外で失敗するはずです:
システムアウトは次のように表示されます:
生成しない理由MissingBackpressureException
。
サイズがより大きい場合、e.onNext(i);
アイテムはバッファに入れられると思いますObservableObserveOn
static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());
MissingBackpressureException
発生しないものをスローする必要があります。バッファーは自動的に拡張されますか? そうでない場合、アイテムはどこに保管されていますか?