問題タブ [reactive-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
akka - Source.asSubscriber を使用してリアクティブ リスナーをラップするにはどうすればよいですか?
Source.asSubscriber
リアクティブリスナーをラップするにはどうすればよいですか? 私はその利点を理解できません。
asynchttpclient WebSocketSource[T]
用に作成しようとしています。これが私のコードです:
最初のイベントで例外が発生します:
おそらくSource.asSubscriber
私にとって悪い選択ですか?reactstreams Subscriber を akka の Source にラップするにはどうすればよいですか?
scala - フロー内の最新のアイテムでリクエストを完了
GET
フローで利用可能な最新のアイテムでリクエストを完了したいと考えています。特にこのフローは、アクターによって生成され、WebSocket によって既に個別に消費されているイベントを集約します。
イベントを次のように表すことができるとしましょう。
最初に行うことはSourceQueue
、アクターがイベントをプッシュする場所とハブを作成して、さまざまなクライアントがこれらのイベントを個別に受信できるようにすることです。
次に、WebSocket 経由でイベントを提供するサービスにイベントをプッシュしqueue
て渡すことができるアクターを作成できます。hub
これは、同時に複数の消費者でも問題なく機能します。
次にやりたいことは、イベントを消費し、hub
ID ごとに最新のイベントのリストを生成し、GET
エンドポイント経由で提供するサービスを用意することです。
これを解決するためにいくつかのアプローチを試しました。私が試みた2つのアプローチは次のとおりです。
- プライベート変数を更新するフローを実行する
last
要素を返すシンクで完了
プライベート変数を更新するフローを実行する
これは、実際に私が試した最後のアプローチです。私が気づいた奇妙な (またはそうですか?) 考えは、実際には何もログに記録されないことです (コンビネーターを通過するものはログに記録されるべきではありませんlog
か?)。
このアプローチを使用した結果latest
は常にnull
であり、応答は常に空です。
「ボックス」アクターを使用して集約をパイプする同様のアプローチも試しましたが、効果は同じです。
last
要素を返すシンクを完成させる
これは私が取ろうとした最初のアプローチです。その結果、タイムアウトに達するまで応答がハングし、Akka HTTP がブラウザーに 500 を返します。
spring-webflux - プロジェクトリアクター - Mono と Flux を組み合わせるには?
私は Flux と Mono を持っていますが、Flux の各アイテムにモノの値を持たせるためにそれらを組み合わせる方法がわかりません。
私はこのアプローチを試みていますが、うまくいきません:
rx-java - Vert.x RxJava および Reactive Streams API
Vert.x は初めてで、現在調査中です。vert.x 3 には、1) 通常の頂点 API を使用する、2) RxJava ベースの API を使用する、3) Reactive Streams ベースの API を使用する、3 つの方法があることがわかりました。
すべての頂点モジュールが RxJava と Reactive Streams API で利用できるのか、それともリアクティブ バージョンが利用できないモジュールがまだいくつかあるのかを知りたいですか? また、#1 に対して #2 または #3 を使用することの欠点は何でしょうか?
java - FluxでsubscribeとblockLastの両方を呼び出す方法は?
私は Project Reactor とリアクティブ ストリーム全般を試してきました。subscribeOn
を使用して別のスレッドでストリームを実行するときに問題が発生しました。コードをメインに置くと、ストリームが終了するまでメインスレッドブロックが必要になるため、次のようにしました。
blockLast()
次に、ブロッキングを行うメソッドがあることに気付きました。しかし、 subscribe と blockLast は返さないため、両方を使用できませんでしたFlux
。
これを行う優雅な方法はありますか?
spring - Spring 5.1 でのテストの失敗
サンプル コードを 5.1 および Spring Boot 2.1 にアップグレードしようとしました。Reactor Netty は新しいバージョンでリファクタリングされ、いくつかの API が変更されました。Reactor Netty の新しいバージョンに従ってコードを変更しました。
HttpServer
リアクターネッティ用。
統合テスト:
テストを実行すると、このような例外が発生しました。
更新:テストコードをリファクタリングすることで解決しました。
spring-boot - Spring Reactive ストリームが Netflix Zuul リバース プロキシで動作しない
プロジェクト用に 7 つのマイクロサービスを作成し、すべて zuul プロキシを介して公開しています。
マイクロ サービスの 1 つ (ライブ ダッシュボード) には、(サーバー送信イベント) を使用してライブ ダッシュボードを更新するための Spring ブート リアクティブ ストリームが含まれています。
リアクティブ ストリームは、ライブ ダッシュボード マイクロサービス ダイレクト レスト API ( http://localhost:8092/live/dashboard ) を介して正常に動作しています。
しかし、ストリームは zuul プロキシ ( http://localhost:8091/rest/livedashboard/live/dashboard ) を介して機能しておらず、通常の API 応答を返しています。
リアクティブ ストリームが zuul プロキシを介してどのように機能しているか、アイデアをお持ちの方に提案をお願いします。
scala - fs2 のリアクティブ ストリームに要素を外部からプッシュする
次のような外部 (つまり、変更できない) Java API があります。
Sender
各イベントを受け入れ、それを JSON オブジェクトに変換し、いくつかのイベントを 1 つのバンドルに収集し、HTTP 経由でエンドポイントに送信するを実装する必要があります。send()
これはすべて、呼び出しスレッドをブロックせずに、固定サイズのバッファーを使用して、バッファーがいっぱいになった場合に新しいイベントをドロップすることなく、非同期で実行する必要があります。
akka-streams を使用すると、これは非常に簡単です: ステージのグラフ (akka-http を使用して HTTP リクエストを送信する) を作成し、それを具体化し、具体化されたものを使用してActorRef
新しいイベントをストリームにプッシュします。
これは、ライブラリが提供するものに非常に似ていますが、特定のニーズに合わせて調整されCustomBuffer
たカスタムです。おそらく、この特定の質問には関係ありません。GraphStage
Buffer
ご覧のとおり、ストリーム以外のコードからのストリームとのやり取りは非常に簡単です。トレイトの!
メソッドActorRef
は非同期であり、追加の機械を呼び出す必要はありません。アクターに送信される各イベントは、リアクティブ パイプライン全体を通じて処理されます。さらに、akka-http がどのように実装されているかにより、接続プールも無料で取得できるため、サーバーに対して複数の接続が開かれることはありません。
しかし、FS2 で同じことを適切に行う方法が見つかりません。バッファリング (おそらく、必要なPipe
追加処理を行うカスタム実装を作成する必要があるでしょう) と HTTP 接続プーリングの問題を捨てても、もっと基本的なこと、つまり、データを「外部から」のリアクティブ ストリーム。
私が見つけることができるすべてのチュートリアルとドキュメントは、プログラム全体が何らかの効果コンテキスト内で発生することを前提としていますIO
。これは私の場合ではありません。send()
メソッドは、指定されていない時間に Java ライブラリによって呼び出されます。したがって、すべてを 1 つのIO
アクション内に保持することはできません。必ずメソッド内で「プッシュ」アクションを終了しsend()
、リアクティブ ストリームを別のエンティティとして持つ必要があります。これは、イベントを集約し、できれば HTTP 接続をプールしたいからです (これは私が信じていることです)。は自然に反応ストリームに結び付けられます)。
のような追加のデータ構造が必要だと思いますQueue
。fs2 には確かに何らかの種類がありfs2.concurrent.Queue
ますが、すべてのドキュメントは単一のIO
コンテキスト内でそれを使用する方法を示しているため、次のようなことを行うと想定しています
次にqueue
、ストリーム定義内で使用し、send()
メソッド内で別のunsafeRun
呼び出しを使用します。
は正しい方法ではなく、おそらくうまくいかないでしょう。
それで、私の質問は、fs2を適切に使用して問題を解決するにはどうすればよいですか?
kotlin - 複数のフローアブルを組み合わせる
私は自分のプロジェクトを Spring から Ktor に移行しており、最初は Reactor であったリアクティブ ストリームの実装を RxJava 2 に置き換えることにしました。これがどのように見えるかです:
問題は、 return の各呼び出しです。最後にそれらを 1 つのストリームに結合するのに役立つ演算子はありますかacquireSomethingFromSomewhere
? Flowable<Some>
Reactor で使用したのは次のとおりです。
Publisher
しかし、RxJavaでは、それぞれが引数として取り、それをFlowable
実装していないため、私の問題を解決できる演算子を見つけることができません。