問題タブ [project-reactor]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
spring - Spring Reactor で失われたメッセージ
Spring アプリケーションでリアクターに送信されたメッセージが失われているようです。これは、Spring コンテナーのライフサイクルでメッセージの生成を開始する時間が原因であると思われます。
私の特定のユースケースでは、起動時に初期化を実行する Spring Bean があります。この初期化の一部には、Reactor に送信されるメッセージの作成が含まれます。
Spring がコンシューマーを登録する順序に問題があり、コンシューマーが登録される前に初期化コードが実行されていると思われます。起動後にコードを手動で実行すると、たとえば、同じコードを呼び出すコントローラーにリクエストを送信すると、メッセージがコンシューマーに送信されます。
初期化コードが実行されるライフサイクルの時間を変更するさまざまな方法を試しました。たとえばApplicationListener<ContextRefreshedEvent>
、メソッドApplicationListener<ContextStartedEvent>
、@PostConstruct
実装InitializingBean
などです。これらのアプローチはどれも機能していないようです。
私のコンシューマーには @reactor.spring.annotation.Consumer のアノテーションが付けられ、@reactor.spring.annotation.Selector.Selector のメソッドにはアノテーションが付けられています。念のため、Spring Boot と自動構成された Reactor (@EnableReactor) を使用しています。
java - Stream への端末呼び出しは実行されません
Spring Reactor Stream API (rxjava に類似) を使用して、2 つのダウンストリーム サービスによって提供される応答をラップするサービスで応答オブジェクトを構築するのに苦労しています。
以下は、accept()
私の Channel Consumer のメソッドです。罪のない人を保護するために、いくつかの名前が変更されています..
したがって、 はFooRequest
多数の をラップしBarRequests
、それぞれに 1 つの関連付けられた Classify 要求と 1 つの関連付けられた Validate 要求があります。1) に変換するFooRequest
、2)FooRequest
を一連の に変換するBarRequests
、3) それぞれに対して 2 つのダウンストリーム リクエストを実行するBarRequest
、4) すべてのBarResponse
オブジェクトを全体的な応答に集約する、5) クライアントに応答を送り返す。
私が問題に遭遇するポイントは、toList()
決して実行されないように見えるメソッドです。を含む何かを試みるたびに、Promise
それは常に壊れているように見えますが、これも例外ではありません.
FooRequestFunction
、BarRequestStreamFunction
かなり単純で、問題なく動作するようです。それらのメソッド シグネチャは次のとおりです。
と:
DownstreamRequestZipFunction
次のようになります。
両方のダウンストリーム リクエスト関数が結果を返す限り、これは問題なく機能するようです。
最後に、連鎖呼び出しの最後にある Consumer には、次の署名があります。
これが行うのは、応答の約束を await() し、それらの応答すべてをチャネルに書き戻された単一の XML ドキュメントに集約することです。ロギングが発生しないため、実行がこのメソッドまで到達しないことがわかります。すべてが .toList() で停止しているようです。
このセットアップが実行されたように見える理由toList()
や、その後何かを知っている人はいますか?
編集:わかりました、もう少し情報があります。デバッグを容易にするためにアプリケーション内の各スレッドに命名規則を指定した後、accept() メソッドを実行するスレッドが「shared-1」で待機状態になり、そこにとどまっていることがわかります。これは、基になる Dispatcher がリングバッファ ディスパッチャであり、シングル スレッドであるという事実に関係している可能性があります。
アプローチが少し異なるようにコードを変更し、マルチスレッド ディスパッチャーを使用して、 を使用しPromise
ないようにしましたが、連鎖した一連の呼び出しの末尾が実行されない状態が残っています。下記参照:
上記では、toList() を reduce() の呼び出しに置き換え、すべてを 1 つの にまとめましたList<BarResponse>
。これが実行され、ログに記録されていることがわかります。ただし、consume()、consumeOn()などを試した後、最後の呼び出しで何をしても、実行されることはなく、上記の最後の呼び出しがログに記録されることもありません。
VisualVM を見ると、ディスパッチャー スレッドはすべて、ブロッキング キューに関連付けられた同じオブジェクト モニターで待機していることがわかります。つまり、すべてのスレッドが作業の到着を待機しています。末尾の consumerOn() 呼び出しが完全に無視されているようです。
ここで何が間違っていますか?私は何を理解していませんか?
編集 2: 以下の Johns の返信を考えると、問題はサーバーのセットアップにあると思われます。おそらくリアクター リリース 2.0.0.M2 専用で、メインApplication
クラスで次のように構成されます。
これにはディスパッチャが設定されておらず、内部では LMAX ディスラプタを使用しているようNettyEventLoopDispatcher
です。NettyEventLoopDispatcher
代わりのディスパッチャとして設定して使用する方法が明確ではありません。
spring-integration - リアクター StringUtils で Spring 統合 xml ファイルのエラーが発生する
全体的なアプリケーションコンテキスト内で子コンテキストとしてロードする春統合 sftp フローがあります。これは、動的 ftp SI の例に基づいています。私の統合フローには、reactor やストリームに関するものは何もありません。sftpサーバーにファイルを転送するためにdirect channel
接続するだけの簡単な流れです。sftp-outbound-gateway
単体テストを実行することもでき、フローは正常に動作します (ファイルを転送できます) が、完全な親アプリケーションをロードし、この sftp フローがロードされた状態で子コンテキストを初期化する統合テストを実行すると、エラーがスローされます。リアクター/StringUtils クラスを見つけることができません。
その理由は、spring-integration-sftp がリアクター jar を一時的な dep としてロードするためのようですが、私の親アプリケーションにはクラスパスに別のバージョンのリアクターが既にロードされているため、リアクター コアをスプリング統合 dep から除外しました。リアクターコアをスプリング統合から除外しないと、バージョンの競合が発生するため、除外したいと思います。
SI フローの初期化
統合テストを実行したときのエラー
org.springframework.beans.factory.BeanDefinitionStoreException: クラスパス リソース [adapters/sftp.xml] から XML ドキュメントを解析中に予期しない例外が発生しました。ネストされた例外は、org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:414) の java.lang.NoClassDefFoundError: reactor/util/StringUtils です。org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions( XmlBeanDefinitionReader.java:336) org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:304) org.springframework.beans.factory.support.AbstractBeanDefinitionReader.loadBeanDefinitions(AbstractBeanDefinitionReader.java:181)
最後にSIフロー
原子炉構成を追加するための更新
spring - リアクターとの春の統合sftp
この質問は、私が投稿した以前の質問の続きです。動作を確認し、おそらく何らかの解決策を見つけ出すために、github プロジェクトを作成したかったのです。ここにgithubプロジェクトがあります
リアクターの Spring サポートは、reactor 2.x バージョンに含まれているようです。Spring 統合 sftp は、reactor の 1.1.4 バージョンを使用し、reactor 2 以降のバージョンでは別のパッケージに移動された StringUtils クラスを参照します。
このバージョンの競合は、SI 4.2 で解決されるまでは避けられないようです。そのため、reactor 1.1.4 を使用して、スプリング リアクターのサポート (この事業)。この方法で SI sftp も機能します。
spring - リアクター コンシューマー向けのマルチスレッド
私は、reactor を使用してアプリケーション全体にイベントを発行し、さまざまなコンシューマーにイベントに応答させています。
これが私のリアクター構成です
デフォルトのリング バッファ ベースのディスパッチャが使用され、単一のコンシューマに送信された複数のメッセージが並行して処理されることを期待しています。代わりに、イベントを同期的に処理しているようです。スレッドshared-1
は、イベント 1 をコンシューマー 1 に処理するために使用され、イベント 1 の処理が完了した後でのみ、同じスレッドがコンシューマー 1 でイベント 2 の処理を開始します。
複数のイベントを複数のコンシューマーに送信し、すべてのイベントを並行して処理できるように、並列処理を実現するにはどうすればよいですか。
提案をいただければ幸いです。
これは、イベントバスにイベントをディスパッチする方法です
これが消費者の1人です
java - リアクター スレッド プール ディスパッチャー
リアクターのドキュメントを確認しましたが、特定の時点でディスパッチされるイベントの数を制御する方法がわかりません。私がやりたいのは、一部の消費者を忙しくしておくのに十分なイベントを大量にディスパッチすることですが、現時点で十分な数の消費者が働いていることを知る方法があるため、これ以上イベントをディスパッチする必要はありません。ディスパッチャを Threadpool を使用するように設定すると、ある時点でスレッドが割り当てられなくなり、拒否例外がスローされる可能性があると考えていました。このようにして、スレッド拒否例外を受け取るまでイベントをディスパッチし続けることができます。
これを行う方法はありますか、それともリアクターを使用すべきではない方法で使用していますか?
java - Reactor Spring による例外処理
私はReactor 2とSpring 4を使用しています。これが私が持っている典型的なコードです-Consumer
リポジトリでの作業
次に、リクエストを渡すコントローラーがあり、eventBus
そこにリクエストを渡し、Promise
物事はうまくいきますがApplicationService
、例外をスローする場合、Promise
値は設定されていませんが、コンソールで次のようになります:
質問は次のとおりです。
Reactor を
eventBus
間違った方法で使用していますか? もしそうなら、正しい方法は何ですかおそらくこの機能はまだ実装されていません
java - Reactor 2.0 を使用した Spring 4 でのマルチスレッド実行
Reactor 2xを既存のアプリケーションに統合して、互いに独立してフェッチできるリクエストSpring 4
の実行中のパフォーマンスを向上させようとしています。ジョブを複数のスレッドに並列化し、それらをバッファに結合するマップリデュースのようなものです。 .REST
resources
これまでのところ、このサンプルは非Spring環境で動作しています:
上記のサンプルでは、 を使用して変換を適用してから、 を メソッドresourceToMapFunction
と結合し、 をbuffer()
作成しPromise
て結果を待ち、 を返しますresult
。
私の最初の質問は、これが Reactor の想定される使用方法ですか? 変換が正しく適用されていることは知っていますが、おそらく、私はReactor
何かを正しい方法で使用していない初心者です。
私の2番目の質問は大したことではありませんが、入力Reactor
で提供されたのと同じ順序でプロジェクトに返すものはありますか? resources
これは複数のスレッドで実行されているので、答えはそうではないと確信しています.
最後の質問です。このコードをプロジェクトに導入すると、下層の変換を適用するSpring
ための依存関係が実行スレッドにないため、変換が失敗します。これは、Spring Reactor バージョンで簡単に実行できますか? Bean
もしそうなら、それを行う方法を示すリンクまたはドキュメントはありますか?
どうもありがとう!
ホセ・ルイス
java - Reactor 2.0 EventBus - 応答を処理するために集約および一時停止する方法
EventBus を使用して Reactor (2.X) でこのユースケースを達成する方法についての提案をいただければ幸いです。
メイン スレッドは、threadPoolExecutor を使用してトピック「foo」にイベントをディスパッチします。イベントは、「foo」にサブスクライブしたいくつかのコンシューマーに送信される場合があります。これらのコンシューマーは、イベント データを調べて、問題があるかどうかを判断します。
メインスレッドを一時停止して、条件のいずれかが満たされるまで待機させたい:
- すべてのコンシューマがイベントを取得し、実行を終了して問題を検出しません。メインスレッドが実行を再開します
- コンシューマの 1 つが問題を発見すると、メイン スレッドは例外をスローする必要があります
- コンシューマの 1 つで実行に時間がかかりすぎています。メイン スレッドは実行を再開する必要がありますが、警告をログに記録する必要があります
だから何か