問題タブ [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
317 参照

spring - Spring Reactor で失われたメッセージ

Spring アプリケーションでリアクターに送信されたメッセージが失われているようです。これは、Spring コンテナーのライフサイクルでメッセージの生成を開始する時間が原因であると思われます。

私の特定のユースケースでは、起動時に初期化を実行する Spring Bean があります。この初期化の一部には、Reactor に送信されるメッセージの作成が含まれます。

Spring がコンシューマーを登録する順序に問題があり、コンシューマーが登録される前に初期化コードが実行されていると思われます。起動後にコードを手動で実行すると、たとえば、同じコードを呼び出すコントローラーにリクエストを送信すると、メッセージがコンシューマーに送信されます。

初期化コードが実行されるライフサイクルの時間を変更するさまざまな方法を試しました。たとえばApplicationListener<ContextRefreshedEvent>、メソッドApplicationListener<ContextStartedEvent>@PostConstruct実装InitializingBeanなどです。これらのアプローチはどれも機能していないようです。

私のコンシューマーには @reactor.spring.annotation.Consumer のアノテーションが付けられ、@reactor.spring.annotation.Selector.Selector のメソッドにはアノテーションが付けられています。念のため、Spring Boot と自動構成された Reactor (@EnableReactor) を使用しています。

0 投票する
1 に答える
233 参照

java - Stream への端末呼び出しは実行されません

Spring Reactor Stream API (rxjava に類似) を使用して、2 つのダウンストリーム サービスによって提供される応答をラップするサービスで応答オブジェクトを構築するのに苦労しています。

以下は、accept()私の Channel Consumer のメソッドです。罪のない人を保護するために、いくつかの名前が変更されています..

したがって、 はFooRequest多数の をラップしBarRequests、それぞれに 1 つの関連付けられた Classify 要求と 1 つの関連付けられた Validate 要求があります。1) に変換するFooRequest、2)FooRequestを一連の に変換するBarRequests、3) それぞれに対して 2 つのダウンストリーム リクエストを実行するBarRequest、4) すべてのBarResponseオブジェクトを全体的な応答に集約する、5) クライアントに応答を送り返す。

私が問題に遭遇するポイントは、toList()決して実行されないように見えるメソッドです。を含む何かを試みるたびに、Promiseそれは常に壊れているように見えますが、これも例外ではありません.

FooRequestFunctionBarRequestStreamFunctionかなり単純で、問題なく動作するようです。それらのメソッド シグネチャは次のとおりです。

と:

DownstreamRequestZipFunction次のようになります。

両方のダウンストリーム リクエスト関数が結果を返す限り、これは問題なく機能するようです。

最後に、連鎖呼び出しの最後にある Consumer には、次の署名があります。

これが行うのは、応答の約束を await() し、それらの応答すべてをチャネルに書き戻された単一の XML ドキュメントに集約することです。ロギングが発生しないため、実行がこのメソッドまで到達しないことがわかります。すべてが .toList() で停止しているようです。

このセットアップが実行されたように見える理由toList()や、その後何かを知っている人はいますか?

編集:わかりました、もう少し情報があります。デバッグを容易にするためにアプリケーション内の各スレッドに命名規則を指定した後、accept() メソッドを実行するスレッドが「shared-1」で待機状態になり、そこにとどまっていることがわかります。これは、基になる Dispatcher がリングバッファ ディスパッチャであり、シングル スレッドであるという事実に関係している可能性があります。

アプローチが少し異なるようにコードを変更し、マルチスレッド ディスパッチャーを使用して、 を使用しPromiseないようにしましたが、連鎖した一連の呼び出しの末尾が実行されない状態が残っています。下記参照:

上記では、toList() を reduce() の呼び出しに置き換え、すべてを 1 つの にまとめましたList<BarResponse>。これが実行され、ログに記録されていることがわかります。ただし、consume()、consumeOn()などを試した後、最後の呼び出しで何をしても、実行されることはなく、上記の最後の呼び出しがログに記録されることもありません。

VisualVM を見ると、ディスパッチャー スレッドはすべて、ブロッキング キューに関連付けられた同じオブジェクト モニターで待機していることがわかります。つまり、すべてのスレッドが作業の到着を待機しています。末尾の consumerOn() 呼び出しが完全に無視されているようです。

ここで何が間違っていますか?私は何を理解していませんか?

編集 2: 以下の Johns の返信を考えると、問題はサーバーのセットアップにあると思われます。おそらくリアクター リリース 2.0.0.M2 専用で、メインApplicationクラスで次のように構成されます。

これにはディスパッチャが設定されておらず、内部では LMAX ディスラプタを使用しているようNettyEventLoopDispatcherです。NettyEventLoopDispatcher代わりのディスパッチャとして設定して使用する方法が明確ではありません。

0 投票する
1 に答える
787 参照

spring-integration - リアクター StringUtils で Spring 統合 xml ファイルのエラーが発生する

全体的なアプリケーションコンテキスト内で子コンテキストとしてロードする春統合 sftp フローがあります。これは、動的 ftp SI の例に基づいています。私の統合フローには、reactor やストリームに関するものは何もありません。sftpサーバーにファイルを転送するためにdirect channel接続するだけの簡単な流れです。sftp-outbound-gateway単体テストを実行することもでき、フローは正常に動作します (ファイルを転送できます) が、完全な親アプリケーションをロードし、この sftp フローがロードされた状態で子コンテキストを初期化する統合テストを実行すると、エラーがスローされます。リアクター/StringUtils クラスを見つけることができません。

その理由は、spring-integration-sftp がリアクター jar を一時的な dep としてロードするためのようですが、私の親アプリケーションにはクラスパスに別のバージョンのリアクターが既にロードされているため、リアクター コアをスプリング統合 dep から除外しました。リアクターコアをスプリング統合から除外しないと、バージョンの競合が発生するため、除外したいと思います。

SI フローの初期化

統合テストを実行したときのエラー

org.springframework.beans.factory.BeanDefinitionStoreException: クラスパス リソース [adapters/sftp.xml] から XML ドキュメントを解析中に予期しない例外が発生しました。ネストされた例外は、org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:414) の java.lang.NoClassDefFoundError: reactor/util/StringUtils です。org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions( XmlBeanDefinitionReader.java:336) org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:304) org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:181)

最後にSIフロー


原子炉構成を追加するための更新

0 投票する
1 に答える
110 参照

spring - リアクターとの春の統合sftp

この質問は、私が投稿した以前の質問の続きです。動作を確認し、おそらく何らかの解決策を見つけ出すために、github プロジェクトを作成したかったのです。ここにgithubプロジェクトがあります

リアクターの Spring サポートは、reactor 2.x バージョンに含まれているようです。Spring 統合 sftp は、reactor の 1.1.4 バージョンを使用し、reactor 2 以降のバージョンでは別のパッケージに移動された StringUtils クラスを参照します。

このバージョンの競合は、SI 4.2 で解決されるまでは避けられないようです。そのため、reactor 1.1.4 を使用して、スプリング リアクターのサポート (この事業)。この方法で SI sftp も機能します。

0 投票する
1 に答える
2314 参照

spring - リアクター コンシューマー向けのマルチスレッド

私は、reactor を使用してアプリケーション全体にイベントを発行し、さまざまなコンシューマーにイベントに応答させています。

これが私のリアクター構成です

デフォルトのリング バッファ ベースのディスパッチャが使用され、単一のコンシューマに送信された複数のメッセージが並行して処理されることを期待しています。代わりに、イベントを同期的に処理しているようです。スレッドshared-1は、イベント 1 をコンシューマー 1 に処理するために使用され、イベント 1 の処理が完了した後でのみ、同じスレッドがコンシューマー 1 でイベント 2 の処理を​​開始します。

複数のイベントを複数のコンシューマーに送信し、すべてのイベントを並行して処理できるように、並列処理を実現するにはどうすればよいですか。

提案をいただければ幸いです。

これは、イベントバスにイベントをディスパッチする方法です

これが消費者の1人です

0 投票する
1 に答える
464 参照

java - リアクター スレッド プール ディスパッチャー

リアクターのドキュメントを確認しましたが、特定の時点でディスパッチされるイベントの数を制御する方法がわかりません。私がやりたいのは、一部の消費者を忙しくしておくのに十分なイベントを大量にディスパッチすることですが、現時点で十分な数の消費者が働いていることを知る方法があるため、これ以上イベントをディスパッチする必要はありません。ディスパッチャを Threadpool を使用するように設定すると、ある時点でスレッドが割り当てられなくなり、拒否例外がスローされる可能性があると考えていました。このようにして、スレッド拒否例外を受け取るまでイベントをディスパッチし続けることができます。

これを行う方法はありますか、それともリアクターを使用すべきではない方法で使用していますか?

0 投票する
2 に答える
7090 参照

java - Reactor Spring による例外処理

私はReactor 2とSpring 4を使用しています。これが私が持っている典型的なコードです-Consumerリポジトリでの作業

次に、リクエストを渡すコントローラーがあり、eventBusそこにリクエストを渡し、Promise

物事はうまくいきますがApplicationService、例外をスローする場合、Promise値は設定されていませんが、コンソールで次のようになります:

質問は次のとおりです。

  1. Reactor をeventBus間違った方法で使用していますか? もしそうなら、正しい方法は何ですか

  2. おそらくこの機能はまだ実装されていません

0 投票する
2 に答える
896 参照

java - Reactor 2.0 を使用した Spring 4 でのマルチスレッド実行

Reactor 2xを既存のアプリケーションに統合して、互いに独立してフェッチできるリクエストSpring 4の実行中のパフォーマンスを向上させようとしています。ジョブを複数のスレッドに並列化し、それらをバッファに結合するマップリデュースのようなものです。 .RESTresources

これまでのところ、このサンプルは非Spring環境で動作しています:

上記のサンプルでは、​​ を使用して変換を適用してから、 を メソッドresourceToMapFunctionと結合し、 をbuffer()作成しPromiseて結果を待ち、 を返しますresult

私の最初の質問は、これが Reactor の想定される使用方法ですか? 変換が正しく適用されていることは知っていますが、おそらく、私はReactor何かを正しい方法で使用していない初心者です。

私の2番目の質問は大したことではありませんが、入力Reactorで提供されたのと同じ順序でプロジェクトに返すものはありますか? resourcesこれは複数のスレッドで実行されているので、答えはそうではないと確信しています.

最後の質問です。このコードをプロジェクトに導入すると、下層の変換を適用するSpringための依存関係が実行スレッドにないため、変換が失敗します。これは、Spring Reactor バージョンで簡単に実行できますか? Beanもしそうなら、それを行う方法を示すリンクまたはドキュメントはありますか?

どうもありがとう!

ホセ・ルイス

0 投票する
0 に答える
258 参照

java - Reactor 2.0 EventBus - 応答を処理するために集約および一時停止する方法

EventBus を使用して Reactor (2.X) でこのユースケースを達成する方法についての提案をいただければ幸いです。

メイン スレッドは、threadPoolExecutor を使用してトピック「foo」にイベントをディスパッチします。イベントは、「foo」にサブスクライブしたいくつかのコンシューマーに送信される場合があります。これらのコンシューマーは、イベント データを調べて、問題があるかどうかを判断します。

メインスレッドを一時停止して、条件のいずれかが満たされるまで待機させたい:

  1. すべてのコンシューマがイベントを取得し、実行を終了して問題を検出しません。メインスレッドが実行を再開します
  2. コンシューマの 1 つが問題を発見すると、メイン スレッドは例外をスローする必要があります
  3. コンシューマの 1 つで実行に時間がかかりすぎています。メイン スレッドは実行を再開する必要がありますが、警告をログに記録する必要があります

だから何か