問題タブ [backpressure]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
javafx - RxJava - キーボード入力のバックプレッシャ?
これは楽しい RxJava の問題です。
RxJava バックプレッシャー演算子を使用して、Google が検索ページで行うのと同じように、各文字が入力されている間に入力された入力をすばやく検索したいと考えています。バックプレッシャーのドキュメントを調べたところ、これを思いつきました (JavaFX を活用するために RxJavaFX/RxKotlinFX を使用しています) 。
これはうまくいきます。コントロールに対して「Hello」とString
入力すると、200ミリ秒の入力なしで「Hello」が出力されます。しかし、本当に応答性を高めたい場合は、キーストローク入力ごとに何らかのローリング累積を行う必要があります。次に、コンソール出力は実際には次のようになります。
これらは、「Hello」という単語を入力したときのすべての排出量である必要があり、200ms は蓄積がリセットされるまでの経過時間を定義します。どうすればいいですか?
apache-flink - Apache Flink: バックプレッシャーをどのように処理しますか?
オペレーターの場合、入力ストリームはその出力ストリームよりも高速であるため、その入力バッファーは、データをこのオペレーターに転送する前のオペレーターの出力スレッドをブロックします。右?
Flink と Spark は両方とも、スレッドをブロックすることでバックプレッシャーを処理しますか? それで、それらの違いは何ですか?
データ ソースの場合、データを継続的に生成していますが、その出力スレッドがブロックされている場合はどうなるでしょうか? バッファがオーバーフローしますか?
buffer - RxJava:バッファ付きの遅いコンシューマと高速なコンシューマを同じソースに実装する方法は?
さまざまなバックプレッシャーのシナリオで遊んでいるときに、あるサブスクライバーがバッファーで遅く、別のサブスクライバーがスローされたものをすべて消費するケースを実装しました。それは Scala と Akka Streams を使用していました。必要に応じてここでコードを確認し、それを実行するテストをここで確認できます。
私は通常、比較のためにRxJavaバージョンを開発しようとしますが、これで行き詰まりました。Akka Streams では、2 つのチャネルでブロードキャストする 1 つのソースを使用してグラフを作成し、それらのチャネルから低速シンクと高速シンクを消費させることができます。各チャネルは、バッファリングとスロットリングを個別に適用できます。RxJava にはshare
ブロードキャスト用のオペレーターがありますが、バッファリングとスロットリングのロジックはSubscriber
ではなく にありObservable
ます。したがって、バッファリングとスロットリングを適用し、両方のサブスクライバーに影響を与えないようにする方法がわかりません。Akka Streams と RxJava の両方が Rx の実装であるため、必要なものを取得する方法があることを願っています。
ここに私がやろうとしていることの絵のバージョンがあります。
java - 適切な演算子を適用した後でも RxJava コードで BackPressure 例外が発生する
Android アプリケーションで RxJava を使用しています。interval() 関数を使用してタイマーを使用していますが、onBackPressureDrop() を追加したにもかかわらず、Missing Backpressure 例外が発生します。また、サブスクライバーに onError() を追加し、例外を Crashlytics に記録していますが、それでもクラッシュします。助けてください。私は問題を理解するのに1週間を費やしましたが、役に立ちませんでした。コードは散発的にクラッシュし、一度も再現できませんでした。TraceonError は、rxjava git の公式問題から直接取得されました。私は間違っているかもしれませんが、Crashlytics例外を印刷する責任があると思います。
私の遅延観察可能 -
私の観察対象 -
traceonError()-
クラッシュ -
android - RxJava onBackpressureBuffer がアイテムを発行しない
onBackpressureBuffer で奇妙な動作を確認しましたが、これが有効な動作なのかバグなのかわかりません。
特定のレートでアイテムを放出するtcp呼び出しを行っています(ストリーミングとinputStreamを使用していますが、それはいくつかの情報のためだけです)
その上に、準備が整うたびにアイテムを発行する create を使用してオブザーバブルを作成しました。
それをメッセージ()と呼びましょう。
それから私はこれをやっています:
MissingBackPressureException がめったにスローされない分析ツールを使用していることに気付いたので、呼び出しに onBackpressureBuffer を追加しました。
後に追加する場合observeOn
:
すべて正常に動作しますが、UI メインスレッドに到達した後にのみバッファリングされることを意味するため、次のようにすることを好みました。
そして、物事が奇妙になり始めるところです。
while がアイテムを放出し続けることに気付きましたが、messages()
ある時点でサブスクライバーへの配信が停止します。
より正確には、ちょうど 16 個のアイテムの後、明らかに起こっていることは、バッファーがアイテムを転送せずに保持し始めることです。
messages()
ある種のタイムアウトメカニズムでキャンセルすると、発行がmessages()
発生onError()
し、バッファは保持していたすべてのアイテムをすぐに発行します (それらは処理されます)。
あまりにも多くの作業を行った加入者のせいであるかどうかを確認しましたが、そうではありません。彼は終了しましたが、まだアイテムを取得していません...
request(n)
また、終了後に 1 つの項目を要求するサブスクライバーでメソッドを使用しようとしましonNext()
たが、バッファーは動作しません。
メインの Android UI スレッドのメッセージング システムがバックプレッシャーを引き起こしていると思われますが、その理由は説明できません。
誰かがなぜこれが起こっているのか説明できますか? これはバグですか、それとも有効な動作ですか? Tnx!
scala - リソース不足に基づいて観測可能なバックプレッシャー
RxJava 1 / RxScala では、次の状況で観察可能なソースをどのように抑制/バックプレッシャできますか?
可能な解決策は、ブロックすることです。これは機能しますが、それは非常に洗練されておらず、複数の同時リクエストを防ぎます:
node.js - Node.js: 書き込み可能なストリームのバック プレッシャに間違った余分なバイトが書き込まれました
クライアントからサーバーにファイルを転送するためにSocket.ioで簡単なバイナリ転送を行いました。うまくいったと思ったのですが、ファイルのサイズが違うことに気付きました。writableStream.write が失敗した場合、drain イベント ハンドラを追加して、書き換え可能になるまで待機し、書き込みを続行しましたが、drain イベントが発生するたびに、drain イベントが発生した回数だけファイルのサイズが増加し、10240 バイトごとにサイズが増加します。チャンク送信ごとに設定します。
ここでコードを記述する前に、コード フローを説明する必要があります。
- クライアント要求アップロード ファイル
- サーバーは空のファイルを作成し(書き込み可能なストリームを作成)、送信を許可します
- クライアント転送データ(チャンク) 終了まで
- サーバーは書き込み可能なストリームでチャンクを書き込みます
- クライアントはすべて送信された時点で送信を終了します
- サーバーは書き込み可能なストリームを閉じます。
- 終わり!
これはサーバー側のコードです:
そしてそれはクライアントです(バイナリファイルを読み取るためのコードを追加):
このコードを 4,162,611 バイト サイズのファイルでテストしたところ、1 回の書き込み失敗 (1 回のバック プレッシャ) がありました。アップロード後、作成したファイルのサイズを確認したところ、元のファイルより10240バイト大きい4,172,851バイトで、chunk(10240)のサイズでした。
サイズが元のサイズよりも 20480 バイト大きく、送信したチャンクのサイズが 2 倍になるため、書き込みが 2 回失敗することがあります。
Backpressure コードを再確認しましたが、問題はないようです。Node v6.2.2 と Socket.io v1.6.0 を使用しており、Chrome ブラウザーからテストされています。私が見逃したものはありますか?それとも背圧について誤解していましたか?どんなアドバイスでも大歓迎です。
アップデート
バックプレッシャーが発生すると、同じデータが2回書き込まれたようです(コメントで述べたように)。そこで、次のようにコードを修正しました。
出来た。書き込み可能なストリームが書き込みに失敗すると、ストリームにデータが書き込まれませんが、実際には書き込まれないため、かなり混乱しています。誰もこれについて知っていますか?
java - 「executeAsync」を使用しているときにcassandraへの書き込みリクエストを調整するにはどうすればよいですか?
cassandra クラスターに接続するために datastax Java ドライバー 3.1.0 を使用しています。cassandra クラスターのバージョンは 2.0.10 です。QUORUMの一貫性を保ちながら非同期に書いています。
上記の save メソッドは、複数のスレッドから非常に高速で呼び出されます。
質問:
executeAsync
Cassandra に非同期で書き込むメソッドへのリクエストを抑制したいと考えています。私の Cassandra クラスターが処理できる速度よりも非常に高速で書き込みを行うと、エラーがスローされ始めます。すべての書き込みが損失なく cassandra に正常に行われるようにする必要があります。
この投稿を見ましたが、解決策はSemaphore
固定数の許可で使用することです。しかし、それを実装するための最良の方法と方法がわかりません。私は以前にセマフォを使用したことがありません。これがロジックです。誰でも私のコードにセマフォベースの例を提供できますか、またはより良い方法/オプションがある場合は、私にも知らせてください。
データローダ プログラムを作成するコンテキストでは、次のようなことができます。
- 物事をシンプルに保つには、セマフォまたは固定数の許可を持つその他の構造を使用します (これは、インフライト リクエストの最大数になります)。executeAsync を使用してクエリを送信するたびに、許可を取得します。セマフォからパーミットを取得してクエリを実行するスレッドは 1 つだけ必要です (ただし、これを実行する # cpu コア サイズのプールを導入する必要がある場合があります)。利用可能な許可が得られるまで、取得時にブロックされます。
- executeAsync から返される未来には、Futures.addCallback を使用します。コールバックは、onSuccess と onFailure の両方のケースで Sempahore.release() を呼び出す必要があります。パーミットを解放することで、ステップ 1 のスレッドが続行し、次のリクエストを送信できるようになります。
また、使用について話している他の投稿をいくつか見RingBuffer
ましGuava RateLimitter
たが、どちらが優れていて、使用する必要がありますか? 以下は私が考えることができるオプションです:
- セマフォの使用
- リングバッファの使用
- Guava レート リミッターの使用
リクエストを抑制したり、cassandra 書き込みのバックプレッシャーを取得したり、すべての書き込みが cassandra に正常に送信されるようにしたりする方法の例を教えてください。
node.js - ストリーム処理のバック プレッシャーとは
私はノードとストリームの学習を開始しました。私が読んだほとんどのドキュメントでは、大きなサイズのファイルを処理しているときに「背圧の問題」について言及していますが、明確な説明は見つかりませんでした。この問題が正確に何であるか。また、パイプを使用するとこの問題を解決できると読みましたが、パイプは背圧の問題を正確にどのように修正しますか?
事前に説明していただきありがとうございます。