問題タブ [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
3 に答える
10037 参照

java - エラー時にMonoからMonoを返す方法は?

0 投票する
2 に答える
207 参照

akka - Source.asSubscriber を使用してリアクティブ リスナーをラップするにはどうすればよいですか?

Source.asSubscriberリアクティブリスナーをラップするにはどうすればよいですか? 私はその利点を理解できません。

asynchttpclient WebSocketSource[T]用に作成しようとしています。これが私のコードです:

最初のイベントで例外が発生します:

おそらくSource.asSubscriber私にとって悪い選択ですか?reactstreams Subscriber を akka の Source にラップするにはどうすればよいですか?

0 投票する
1 に答える
120 参照

scala - フロー内の最新のアイテムでリクエストを完了

GETフローで利用可能な最新のアイテムでリクエストを完了したいと考えています。特にこのフローは、アクターによって生成され、WebSocket によって既に個別に消費されているイベントを集約します。

イベントを次のように表すことができるとしましょう。

最初に行うことはSourceQueue、アクターがイベントをプッシュする場所とハブを作成して、さまざまなクライアントがこれらのイベントを個別に受信できるようにすることです。

次に、WebSocket 経由でイベントを提供するサービスにイベントをプッシュしqueueて渡すことができるアクターを作成できます。hub

これは、同時に複数の消費者でも問題なく機能します。

次にやりたいことは、イベントを消費し、hubID ごとに最新のイベントのリストを生成し、GETエンドポイント経由で提供するサービスを用意することです。

これを解決するためにいくつかのアプローチを試しました。私が試みた2つのアプローチは次のとおりです。

  • プライベート変数を更新するフローを実行する
  • last要素を返すシンクで完了

プライベート変数を更新するフローを実行する

これは、実際に私が試した最後のアプローチです。私が気づいた奇妙な (またはそうですか?) 考えは、実際には何もログに記録されないことです (コンビネーターを通過するものはログに記録されるべきではありませんlogか?)。

このアプローチを使用した結果latestは常にnullであり、応答は常に空です。

「ボックス」アクターを使用して集約をパイプする同様のアプローチも試しましたが、効果は同じです。

last要素を返すシンクを完成させる

これは私が取ろうとした最初のアプローチです。その結果、タイムアウトに達するまで応答がハングし、Akka HTTP がブラウザーに 500 を返します。

0 投票する
3 に答える
9099 参照

spring-webflux - プロジェクトリアクター - Mono と Flux を組み合わせるには?

私は Flux と Mono を持っていますが、Flux の各アイテムにモノの値を持たせるためにそれらを組み合わせる方法がわかりません。

私はこのアプローチを試みていますが、うまくいきません:

0 投票する
1 に答える
623 参照

rx-java - Vert.x RxJava および Reactive Streams API

Vert.x は初めてで、現在調査中です。vert.x 3 には、1) 通常の頂点 API を使用する、2) RxJava ベースの API を使用する、3) Reactive Streams ベースの API を使用する、3 つの方法があることがわかりました。

すべての頂点モジュールが RxJava と Reactive Streams API で利用できるのか、それともリアクティブ バージョンが利用できないモジュールがまだいくつかあるのかを知りたいですか? また、#1 に対して #2 または #3 を使用することの欠点は何でしょうか?

0 投票する
1 に答える
1341 参照

java - FluxでsubscribeとblockLastの両方を呼び出す方法は?

私は Project Reactor とリアクティブ ストリーム全般を試してきました。subscribeOnを使用して別のスレッドでストリームを実行するときに問題が発生しました。コードをメインに置くと、ストリームが終了するまでメインスレッドブロックが必要になるため、次のようにしました。

blockLast()次に、ブロッキングを行うメソッドがあることに気付きました。しかし、 subscribe と blockLast は返さないため、両方を使用できませんでしたFlux

これを行う優雅な方法はありますか?

0 投票する
0 に答える
1836 参照

spring - Spring 5.1 でのテストの失敗

サンプル コードを 5.1 および Spring Boot 2.1 にアップグレードしようとしました。Reactor Netty は新しいバージョンでリファクタリングされ、いくつかの API が変更されました。Reactor Netty の新しいバージョンに従ってコードを変更しました。

HttpServerリアクターネッティ用。

統合テスト:

テストを実行すると、このような例外が発生しました。

更新:テストコードをリファクタリングすることで解決しました。

0 投票する
1 に答える
594 参照

spring-boot - Spring Reactive ストリームが Netflix Zuul リバース プロキシで動作しない

プロジェクト用に 7 つのマイクロサービスを作成し、すべて zuul プロキシを介して公開しています。

マイクロ サービスの 1 つ (ライブ ダッシュボード) には、(サーバー送信イベント) を使用してライブ ダッシュボードを更新するための Spring ブート リアクティブ ストリームが含まれています。

リアクティブ ストリームは、ライブ ダッシュボード マイクロサービス ダイレクト レスト API ( http://localhost:8092/live/dashboard ) を介して正常に動作しています。

しかし、ストリームは zuul プロキシ ( http://localhost:8091/rest/livedashboard/live/dashboard ) を介して機能しておらず、通常の API 応答を返しています。

リアクティブ ストリームが zuul プロキシを介してどのように機能しているか、アイデアをお持ちの方に提案をお願いします。

0 投票する
3 に答える
1186 参照

scala - fs2 のリアクティブ ストリームに要素を外部からプッシュする

次のような外部 (つまり、変更できない) Java API があります。

Sender各イベントを受け入れ、それを JSON オブジェクトに変換し、いくつかのイベントを 1 つのバンドルに収集し、HTTP 経由でエンドポイントに送信するを実装する必要があります。send()これはすべて、呼び出しスレッドをブロックせずに、固定サイズのバッファーを使用して、バッファーがいっぱいになった場合に新しいイベントをドロップすることなく、非同期で実行する必要があります。

akka-streams を使用すると、これは非常に簡単です: ステージのグラフ (akka-http を使用して HTTP リクエストを送信する) を作成し、それを具体化し、具体化されたものを使用してActorRef新しいイベントをストリームにプッシュします。

これは、ライブラリが提供するものに非常に似ていますが、特定のニーズに合わせて調整されCustomBufferたカスタムです。おそらく、この特定の質問には関係ありません。GraphStageBuffer

ご覧のとおり、ストリーム以外のコードからのストリームとのやり取りは非常に簡単です。トレイトの!メソッドActorRefは非同期であり、追加の機械を呼び出す必要はありません。アクターに送信される各イベントは、リアクティブ パイプライン全体を通じて処理されます。さらに、akka-http がどのように実装されているかにより、接続プールも無料で取得できるため、サーバーに対して複数の接続が開かれることはありません。

しかし、FS2 で同じことを適切に行う方法が見つかりません。バッファリング (おそらく、必要なPipe追加処理を行うカスタム実装を作成する必要があるでしょう) と HTTP 接続プーリングの問題を捨てても、もっと基本的なこと、つまり、データを「外部から」のリアクティブ ストリーム。

私が見つけることができるすべてのチュートリアルとドキュメントは、プログラム全体が何らかの効果コンテキスト内で発生することを前提としていますIO。これは私の場合ではありません。send()メソッドは、指定されていない時間に Java ライブラリによって呼び出されます。したがって、すべてを 1 つのIOアクション内に保持することはできません。必ずメソッド内で「プッシュ」アクションを終了しsend()、リアクティブ ストリームを別のエンティティとして持つ必要があります。これは、イベントを集約し、できれば HTTP 接続をプールしたいからです (これは私が信じていることです)。は自然に反応ストリームに結び付けられます)。

のような追加のデータ構造が必要だと思いますQueue。fs2 には確かに何らかの種類がありfs2.concurrent.Queueますが、すべてのドキュメントは単一のIOコンテキスト内でそれを使用する方法を示しているため、次のようなことを行うと想定しています

次にqueue、ストリーム定義内で使用し、send()メソッド内で別のunsafeRun呼び出しを使用します。

は正しい方法ではなく、おそらくうまくいかないでしょう。

それで、私の質問は、fs2を適切に使用して問題を解決するにはどうすればよいですか?

0 投票する
1 に答える
859 参照

kotlin - 複数のフローアブルを組み合わせる

私は自分のプロジェクトを Spring から Ktor に移行しており、最初は Reactor であったリアクティブ ストリームの実装を RxJava 2 に置き換えることにしました。これがどのように見えるかです:

問題は、 return の各呼び出しです。最後にそれらを 1 つのストリームに結合するのに役立つ演算子はありますかacquireSomethingFromSomewhere? Flowable<Some>Reactor で使用したのは次のとおりです。

Publisherしかし、RxJavaでは、それぞれが引数として取り、それをFlowable実装していないため、私の問題を解決できる演算子を見つけることができません。