問題タブ [backpressure]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
javascript - RxJS が最初に実行され、次に調整されて待機します
mousewheel
RxJS-DOM を使用してイベントを監視し、最初のイベントが発生したときにそれを転送してから、後続の値の間の遅延が以前に指定された期間を経過するまですべての値をドロップするようにします。
私が想像している演算子は次のようになります。
次のデータ ストリームを想像してください。
ここで、送信される値は数値であり、時間は次の値が到着するまでの時間を表します。0 が最初の値であるため、それが出力され、その後の値の間の個々の遅延が より大きくないため、3 までのすべての値が削除されます500ms
。
スロットルとは異なり、値間の時間遅延は、最後に受信した値が発行されたかどうかに関係なく計算されます。スロットルを使用すると、0 が送信され、200 ミリ秒が経過し、1 が評価されて失敗し、400 ミリ秒が経過し、2 が評価されて合格になります。これは、最後に送信された値 (0) と現在受信されている値 (2 ) は 600 ミリ秒ですが、私のオペレーターでは、1 に対して相対的に評価され、経過時間は 400 ミリ秒になるため、テストに失敗します。
また、この演算子はデバウンスでもありません。発行する間隔が経過するまで待機する代わりに、最初の値を送信してから、将来のすべての値に対して評価を行います。
このようなオペレーターはすでに存在しますか?そうでない場合は、どうすれば作成できますか?
spring - Java8 ストリームまたはデータベース リクエストのリアクティブ / オブザーバー
Spring MVC アプリケーションの動作を再考しています。データベースからデータをプルする(Java8 ストリーム) か、データベースにデータをプッシュ(リアクティブ/オブザーバブル) させ、バックプレッシャーを使用して量を制御する方がよいかどうかです。
現在の状況:
User
最新の 30 件の記事をリクエストするService
データベース クエリを実行し、30 個の結果をList
Jackson
を繰り返し処理しList
、JSON 応答を生成します
なぜ実装を切り替えるのですか?
これらの 30 個のオブジェクトを常にメモリに保持しているため、かなりメモリを消費します。アプリケーションは一度に 1 つのオブジェクトを処理するため、これは必要ありません。ただし、アプリケーションは1 つのオブジェクトを取得して処理し、破棄して次のオブジェクトを取得できる必要があります。
Java8 ストリーム? (引く)
これjava.util.Stream
は非常に簡単です。は、バックグラウンドでデータベース カーソルを使用する をService
作成します。の 1 つの要素の JSON 文字列を書き込むStream
たびに、次の要素を要求し、データベース カーソルをトリガーして次のエントリを返します。Jackson
Stream
RxJava / リアクティブ / オブザーバブル? (押す)
ここでは逆のシナリオがあります。データベースはエントリごとにプッシュする必要があり、メソッドが呼び出されるJackson
まで各要素の JSON 文字列を作成する必要があります。onComplete
つまり、Controller
は にService
:をくださいObservable<Article>
。次にJackson
、処理できる限り多くのデータベース エントリを要求できます。
相違点と懸念事項:
次のデータベース エントリを要求してから取得/処理するまでにStreams
は、常に多少の遅延があります。これにより、ネットワーク接続が遅い場合、または応答を満たすために大量のデータベース要求を行う必要がある場合、JSON 応答時間が遅くなる可能性があります。
そこを使用RxJava
すると、常に処理可能なデータが必要になります。また、それが多すぎる場合は、バックプレッシャーを使用して、データベースからアプリケーションへのデータ転送を遅くすることができます。最悪のシナリオでは、バッファ/キューには要求されたすべてのデータベース エントリが含まれます。次に、メモリ消費量は、List
.
なぜ私は尋ねているのですか / 私は何を求めているのですか?
私は何を取りこぼしたか?他に長所/短所はありますか?
各データベース要求/応答間に常に (短い) 遅延がある場合、(特に) Spring Data Team
Stream
がデータベースからの応答をサポートするように API を拡張したのはなぜですか? これにより、要求されたエントリが大量になると、顕著な遅延が発生する可能性があります。RxJava
このシナリオでは (または他のリアクティブな実装) を使用することをお勧めしますか? または、欠点を見逃しましたか?
asynchronous - mapcat が core.async のバックプレッシャーを壊すときのメモリ リークはどこにありますか?
Clojure で core.async コードを書き、それを実行すると、利用可能なすべてのメモリが消費され、エラーで失敗しました。mapcat
core.async パイプラインで使用すると、プレッシャーが解消されるようです。(これは、この質問の範囲を超えた理由で残念です。)
以下は、 ing トランスデューサ:x
の出入りをカウントすることによって問題を示すコードです。mapcat
生産者は消費者よりはるかに先を行っています。
これを発見したのは私が初めてではないようです。しかし、ここで与えられた説明はそれをカバーしていないようです。(ただし、適切な回避策は提供されます。) 概念的には、プロデューサーが先行していると思いますが、チャネルにバッファリングされる可能性のあるいくつかのメッセージの長さだけです。
私の質問は、他のすべてのメッセージはどこにありますか? 出力の 4 行目までに、7000:x
秒が不明です。
reactive-programming - ファンアウト機能を備えたリアクティブ ストリーム アクター システムを設計する方法
バックプレッシャー機能を備えたアクターベースのシステムを実装しようとしています。要件として、マスター プロセスはストリーミング データを JSON 形式で受信します。ただし、各 JSON イベントには、{ip: '123.43.12.1', country: 'US', ... etc} などのいくつかのフィールドがあります。JSON の構造は事前にわかっています。
さて、どうにかして JSON 構造を (キー、値) ペアにフラット化する必要があります。たとえば、上記のデータは (ip, freq), (country, freq) のように平坦化できます。ここで、freq は、IP (「123.43.12.1」など) がデータ ストリームに現れる回数です。
非常に自然な方法は、さらに評価するために、各 (キー、値) ペアを対応する子/リモート アクターに転送することです。たとえば、('123.43.12.1', 1) は IP-Actor に送信されます。('US', 1) は Country-Actor などに送信されます。
システム全体が背圧であることを確認したい。この場合、イベント {ip: '123.43.12.1', country: 'US'} は、IP-Actor と Country-Actor の両方が平坦化されたペア ('123.43. 12.1', 1), ('米国', 1). 各アクターの処理速度は異なる場合があります (たとえば、IP-Actor は Country-Actor よりもはるかに高速です)。その場合、ストリームを受信したマスター プロセスが要求信号があるまで待機/ブロックする必要があります (両方のアクターがメールボックス内の既存のデータの処理を終了したときに発生します)。そうしないと、一部のアクターがメールボックス内のメッセージでいっぱいになる可能性があります (Country-Actor - 遅いもの) が、他のアクターのメールボックスが空であるため、メッセージがまだ入ってきます (IP-Actor - 速いもの)。
反応ストリーム仕様がそのような機能を提供するかどうか、誰でも提案できますか。そうでない場合は、最も効率的な方法で機能を実現する方法があります。
ありがとう。
javascript - RxJSストリームを一度にn個のアイテムを処理し、アイテムが完了すると再びn個に自動入力する方法は?
イベントのストリームがあり、これらのイベントごとに promise を返す関数を呼び出したいと考えています。問題は、この関数が非常に高価であるため、一度に最大 n 個のイベントを処理したいということです。
この小石の図はおそらく間違っていますが、これは私が望むものです:
これは私がこれまでに持っているコードです:
このコードには 2 つの問題があります。
inProgressCount
副作用関数で更新される var を使用しています。done$ サブスクリプションは、制御されたストリームから複数のアイテムを要求したときに 1 回だけ呼び出されます。これにより、inProgressCount
変数が誤って更新され、最終的にキューが一度に 1 つに制限されます。
ここで動作していることがわかります: http://jsbin.com/wivehonifi/1/edit?js,console,output
質問:
- より良いアプローチはありますか?
inProgressCount
どうすれば変数を取り除くことができますか?複数のアイテムを要求するときに done$ サブスクリプションが 1 回しか呼び出されないのはなぜですか?
更新:
質問 #3 への回答: switchMap は flatMapLatest と同じであるため、最後の 1 つしか取得できませんでした。コードを switchMap ではなく flatMap に更新しました。
javascript - RxJS v4 コードを v5 に変換し、「プル」でキューを処理する
観察/購読したい「作業キュー」があります。これは、処理するコマンド オブジェクトの配列です。通常、新しい作業項目は一気に到着し、順次処理する必要があります (完全に処理されるまで、受信した順序で一度に 1 つずつ)。
RxJS 5.0.0-beta.6 を使用しています。(他のライブラリによって課されたバージョン)
これは、私が望む動作を示す実際の例ですが、RxJS v4 を使用しています。
問題のメインコードはこれです...
http://jsbin.com/meyife/edit?js,出力
API の現在のベータ版の状態と、ドキュメントが不完全または変更されていることを考えると、RxJS 5 でこれを行う方法がわかりません。
更新: v4 から v5 への移行に関するこの移行ガイドには、削除された多くの機能が示されていますが、新しい方法で行う方法については説明されていません。削除された操作の例: .tap、.managed、.flatMapWithMaxConcurrent (名前が変更されました)。
java - バックプレッシャーはRxJavaで内部的にどのように発生しますか?
RxJava のバックプレッシャーに関するいくつかのドキュメントを読んでいますが、ライブラリの内部でどのように発生するかなどの詳細な説明を見つけることができません。「プロデューサー」は速すぎ、「コンシューマー」は遅すぎます。
たとえば、以下のコードのように:
私は RxJava ソースコードを調べてきたので、私の理解では、メインスレッドではミリ秒ごとにイベントを発行し、発行したら System.out.println(i) メソッドに値を渡し、それをnewThead スケジューラのスレッド プールを開き、ランナブル内でメソッドを実行します。
だから私の質問は、どのように例外が内部的に発生するのですか? Thread.sleep() を呼び出すと、メソッド呼び出しを処理するスレッドをスリープ状態にするだけです -> System.out.println() スレッド プール内の他のスレッドに影響を与えることなく、例外が発生します。スレッドプールに使用可能なスレッドが十分にないためですか?
ありがとう