問題タブ [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
2 に答える
140 参照

java - コールド観測量とホット観測量で構成された観測量

A指定されたコールドオブザーバブルからすべてのアイテムを放出し、その完了がホットオブザーバブルで継続するとすぐに、オブザーバブルを構成する適切な方法を見つけるのに苦労していBます。

これは私の特定の使用例です。追加専用データベース (イベント ストリーム) にリアルタイムでデータを追加するデータ コレクターがあります。そして、すべてのイベント ストリームをストリーミングするためのリクエストが到着すると、データベースからすべてのストリーミングを開始することが期待されます。データベースにデータがなくなるとすぐに、コレクター ストリームのストリーミングが開始されます...両方がオブザーバブルとして利用できることがわかります。

私はリアクティブ プログラミングが初めてなので、私の質問は少し抽象的かもしれません。どんな助けでも感謝します。

この動作の図を次に示します。

これRが観測可能な結果Aで、冷たいものBは熱いものです。Rで終了していBます。

0 投票する
1 に答える
202 参照

java - hot Observable のオンデマンド実行

冷たい例を考えると:

新しいサブスクライバーごとにゼロから実行を開始します。

サブスクライバーが早期にサブスクライブを解除すると、実行を停止できます。

サンプルの for ループの代わりに、実際のビジネス ロジックが進行している場合 (サブスクライバーごとにリプレイする必要はなく、代わりにリアルタイムである必要があります)、ホット オブザーバブルを扱っています...

開始したいときに実行する可能性があります

したがって、最初の質問:最初のオブザーバーがサブスクライブしたときにのみ作業を開始するにはどうすればよいですか? (コネクティブルオブザーバブルかな?)

そして重要な質問:最後のサブスクライバーがサブスクライブを解除したときに作業を停止するにはどうすればよいですか? (そのサブジェクトの現在のサブスクリプションにアクセスする方法がわかりません。そのようなソリューションが存在する場合、グローバル状態を共有せずにクリーンなソリューションを見つけたいと思います)

私が考えることができる1つの解決策は、サブスクライバーを管理するカスタムオペレーターで件名を持ち上げることです...

0 投票する
1 に答える
359 参照

rx-java - react-ipc、reactive stream io、reactive stream net、reactivesocketの関係は何ですか?

リアクティブストリームに関するGithubで多くのリポジトリを見つけましたが、似ているようです。

ただし、多くのリポジトリは数か月更新されていません。

それらの関係を知りたいので、最新のリポジトリに集中できます。

リアクティブ-ipc: https://github.com/reactive-ipc/reactive-ipc-jvm

反応ストリームネット: https://github.com/reactive-streams/reactive-streams-net-jvm

リアクティブソケット: https://github.com/ReactiveSocket/reactivesocket-java.git

0 投票する
2 に答える
920 参照

reactive-programming - RxJS v5 で Observable を一時停止してバッファリングする方法

特定の条件下で保留中のリクエストを数秒間一時的に保留するために、HTTPリクエストにバックプレッシャー戦略を実装しようとしています。一時停止するロジックは、別の Observable に基づいています。

私の研究と理解により、pausableBufferedオペレーターは私が必要としているものを正確に実行してくれると信じています。ここに文書化されていますhttp://reactivex.io/documentation/operators/backpressure.html

ただし、ReactiveX v5 (5.0.0-beta.0) ではこの演算子を見つけることができず、移行ガイド (v4 - v5) には削除されたことが示されているようです。この場合、v5 の使用可能なオペレーターを使用して、どうすれば目的の結果を得ることができますか?

0 投票する
1 に答える
274 参照

stream - Akka Reactive Streams プロセッサの目的

akka の Reactive Streams を理解しようとしています。このブログhttp://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/を読んだことがありますが、それがどのように機能するかについての基本的なアイデアが得られたと思います。しかし、私が理解していないのは、この概念におけるプロセッサの目的です。それはなんのためですか?サブスクライバーが N オブジェクトを要求し、パブリッシャーが onNext() を使用してそれらを送信するだけで十分ではないでしょうか?

0 投票する
1 に答える
304 参照

akka - Akka Streams Source の要素を結合することは可能ですか?

彼のような情報源がある場合:

特定の区切りパターンで要素を結合して再分割することは可能ですか?たとえば、「\ n」文字で、結果としてこのようなものになりますか? :

ありがとう!

0 投票する
1 に答える
1750 参照

scala - グラフ作成のエラー: 要件が失敗しました: インレット [] とアウトレット [] は、インレット [イン] とアウトレット [アウト] に対応する必要があります

akka ストリームの graphDSL を使用して、実行可能なグラフを作成しています。ストリームコンポーネントのインレット/アウトレットに関してコンパイル時のエラーはありません。ランタイムは次のエラーをスローします:

実行するには何を確認すればよいですか?

グラフ構造:

0 投票する
1 に答える
347 参照

scala - 実行中の akka ストリーム グラフを故意に停止する

次の単純なグラフがあるとします。

私たちはそれを次のように実行しています

次の使用可能な要素をプッシュする代わりに、kafkaSource に停止 (または人為的に完了) するように通知して、下流の接続されたステージも停止するようにしたいと考えています。

どうすればそれを達成できますか?

シナリオは、カフカに何百万ものメッセージがあり、毎日午後 9 時にメッセージの処理を停止したいと考えており、クリーン シャットダウンで実行中のアプリケーションを停止していると仮定します。

0 投票する
1 に答える
281 参照

akka-stream - Akka Streams 2.4.2 で RunnableGraph を実行し、インレット/アウトレットが正しく構成されていることを確認する方法は?

GraphDSL.create() を使用して RunnableGraph を構成しました。また、ClosedShape を指定し、すべてのアウトレット/インレットを接続しました。プログラムを実行しようとすると、次の実行時例外が発生します。

requirement failed: The inlets [] and outlets [] must correspond to the inlets [filter.in] and outlets [out]

インレットとアウトレットを正しく接続していない場所はありますか?

グラフコードは次のとおりです。

0 投票する
1 に答える
463 参照

scala - Akka-http: Http.cachedHostConnectionPoolHttps は、2XX 以外の応答に対してしばらくすると処理を停止します

cachedHostConnectionPoolHttps を使用してアウトバウンドの https リクエストを作成しようとしていますが、2XX 以外の応答でしばらくすると、プール フローが要素の出力を停止し、フロー全体が停止するようです。この特定の動作が発生する時間は非常にランダムですが、発生し、再現可能です。

これは、しばらくExpected OnNext(_), yet no element signaled during 10 secondsスローされた後に結果の発行を停止するサンプル テストです。

ここで何が間違っているのか?しばらくは機能するので、2xx以外のエラーの処理中に何か問題が発生していると思われます。

と をオフにしてみましakka.http.host-connection-pool.max-retries = 0akka.http.host-connection-pool.idle-timeout = infiniteが、結果はありません。