問題タブ [rx.net]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
c# - LinqToTwitter - 同時実行性を考慮してオブザーバブル ストリームを正しく破棄する方法
IObservable<Tweet>
以下に示すように、LinqToTwitter で観察可能なコレクションを作成しました。問題は、最初のオブザーバブルを破棄して新しいオブザーバブルをサブスクライブするときに、この実装に同時実行の問題があることです。
最初のオブザーバブルを正しく処分するにはどうすればよいですか?
(以下のサンプルは完全であり、そのまま動作するはずです。参照パッケージと Twitter 資格情報を追加するだけです。)
この問題が発生する例を次に示します。
StartAsync
2 番目のオブザーバブルが作成される前に、最初のオブザーバブルの作成のクロージャーのメソッドが実行された場合はdisposed
、に設定されtrue
、すべて問題ありません。
ただし、最初のクロージャーの次の実行の前に 2 番目のオブザーバブルが作成された場合、StartAsync
disposed
再び false に設定され、s.CloseStream();
呼び出されることはありません。
オブザーバブルの作成は次のとおりです。
そして最後にTweet
クラス:
c# - Reactive Extensions (RX) では、「一時停止」コマンドを追加できますか?
イベントのストリームを取り込み、別のイベントのストリームをプッシュするクラスがあります。
すべてのイベントで Reactive Extensions (RX) が使用されます。イベントの着信ストリームは外部ソースからIObserver<T>
using.OnNext
にプッシュされ、イベントの発信ストリームは and を使用してプッシュされIObservable<T>
ます.Subscribe
。私はSubject<T>
舞台裏でこれを管理するために使用しています。
出力を一時的に一時停止するための RX の手法にはどのようなものがあるのだろうかと考えています。これは、着信イベントが内部キューに蓄積され、一時停止が解除されると、イベントが再び流出することを意味します。
c# - 単一の同時実行制限で、Rx を使用して定期的なタスクを実行する良い方法は何ですか?
特定の時点で実行されるメソッドの実行は最大でも 1 つだけという制限付きで、定期的なタスクを実行したいと考えています。
Rx を試していましたが、最大 1 回の同時実行制限を課す方法がわかりません。
さらに、タスクがまだ実行されている場合は、後続のスケジュールが経過するようにします。つまり、タスクがキューに入れられて問題が発生するのは望ましくありません。
定期的に実行するタスクが 2 つあります。実行中のタスクは現在同期しています。ただし、必要に応じて非同期にすることもできます。
c# - FromAsyncPatternを介してObservableを使用してストリームから読み取る、適切に閉じる/キャンセルする方法
ニーズ: TCP 接続を使用した長期実行プログラム
AC# 4.0 (VS1010、XP) プログラムは、TCP を使用してホストに接続し、バイトを送受信し、場合によっては接続を適切に閉じてから再度開く必要があります。周囲のコードは Rx.NetObservable
スタイルを使用して記述されています。データの量は少ないですが、プログラムは継続的に実行する必要があります (リソースを適切に破棄することでメモリ リークを回避します)。
以下のテキストは、私が検索して見つけたものを説明するため、長いです。これで動作するようになりました。
全体的な質問は次のとおりです。Rx は直感的でない場合があるため、解決策は適切ですか? それは信頼できますか (たとえば、何年も問題なく動作しますか)?
これまでの解決策
送信
NetworkStream
プログラムは次のように取得します。
非同期送信は簡単です。Rx.Net では、従来のソリューションよりもはるかに短くクリーンなコードでこれを処理できます。で専用スレを立てましたEventLoopScheduler
。送信が必要な操作は、 を使用して表現されIObservable
ます。ObserveOn(sendRecvThreadScheduler)
すべての送信操作がそのスレッドで行われるという保証を使用します。
これまでのところ、これは優れており、完璧です。
受け取る
Rx.Net は、データを受信するために、従来のソリューションよりも短くてクリーンなコードも許可する必要があるようです。いくつかのリソース ( http://www.introtorx.com/など) と stackoverflow を読んだ後、非常に簡単な解決策は、https: //stackoverflow.com/a/14464068 のように非同期プログラミングを Rx.Net にブリッジすることです。 /1429390 :
それは主に動作します。バイトを送受信できます。
閉店時間
これは、物事がうまくいかなくなるときです。
ストリームを閉じて、物事をきれいに保つ必要がある場合があります。基本的にこれは、読み取りを停止し、バイト受信オブザーバブルを終了し、新しい接続で新しい接続を開くことを意味します。
たとえば、接続がリモート ホストによって強制的に閉じられると、BeginRead()/EndRead()
すぐにすべての CPU を消費するループがゼロ バイトを返します。高レベルのコードにこれ (高レベルの要素が利用可能なコンテキストでSubscribe()
toを使用) とクリーンアップ (ストリームのクローズと破棄を含む) を認識させます。ReadObservable
これもうまく機能し、 によって返されたオブジェクトの破棄を処理しますSubscribe()
。
時々、ストリームを閉じる必要があります。しかし、明らかにこれにより、非同期読み取りで例外がスローされる必要があります。c# - BeginRead と BeginWrite を途中で中止する適切な方法は? - スタックオーバーフロー
シーケンスを終了さCancellationToken
せるを追加しました。長時間スリープする可能性があるため、Observable.While()
これはこれらの例外を回避するのにあまり役立ちません。BeginRead()
オブザーバブルの未処理の例外により、プログラムが終了しました。提供された .net の検索- 例外の後もサブスクリプションを使用し続ける - スタック オーバーフローは、壊れたものを空のもので効果的に再開するCatchを追加することを提案しました。Observable
コードは次のようになります。
今何?質問
これはうまくいくようです。リモート ホストが接続を強制的に閉じた、または到達できなくなったという条件が検出され、上位レベルのコードが接続を閉じて再試行します。ここまでは順調ですね。
物事が完全に正しいと感じるかどうかはわかりません。
一つには、その行:
命令型コードでの空の catch ブロックの悪い習慣のように感じます。実際のコードは例外をログに記録し、上位レベルのコードは応答がないことを検出して正しく処理するため、かなり問題ないと見なす必要があります (以下を参照)。
また、これは実際、ほとんどの従来のソリューションよりも短くなっています。
解決策は正しいですか、それともよりシンプルでクリーンな方法を見逃していましたか?
Reactive Extensions のウィザードにとって明らかな恐ろしい問題はありますか?
ご清聴ありがとうございました。
c# - 特定の条件に基づいてスライディング ウィンドウで RX 信号を発生させる方法
センサー データの観測可能なホット ストリームがあります。センサー値が一定期間 15 を下回った場合にのみ発火する観測可能な信号が必要です。値が 15 を超えた場合はいつでも、スライディング ウィンドウをリセットする必要があります。以下のコードで部分的に動作するようにしましたが、値が常に 15 未満の場合はトリガーされません。
助言がありますか?
.net - タイムアウト拡張機能内の最新のシーケンス要素を取得するには?
タイムアウトが発生する前に、シーケンス内の最新の要素を取得する最良の方法は何だろうか?
リモート サービスに時々 ping を実行するコードがあり、オフラインになったサービスを特定できるようにしたいと考えています。
Timeout 拡張機能を使用すると、次のようになりました。
これはちょっとうまくいきますが、どのサービスがなくなったかを見つけることはできません。私が欲しいのは、ストリーム内の最新のメッセージを引数として使用して、それが生成するエラー メッセージにいくつかの情報を提供する Timeout 拡張機能です。
タイムアウト拡張機能内の最新のシーケンス要素を取得するにはどうすればよいですか?
c# - MVVM の Rx を使用したコレクションの変更の追跡
私はMVVMパターンを使用して書かれたクロスプラットフォームプロジェクトを持っています(特定のフレームワークは使用されておらず、自己記述の実装のみです)。プロジェクトにはいくつかの独立したモジュールがあり、それぞれに複数のページがあります。各ページには、ViewModel と、データ指向のロジック (取得、保存、削除、変換など) を担当するある種のマネージャーがあります。したがって、データフローは次のようになります。
VM -> マネージャー -> サービス -> マネージャー -> VM
VM がロードされると、マネージャーにデータを要求します。Manager は、サービス呼び出しを実行し、データを取得し、DTO からモデルのコレクションを構築し、このコレクションを ViewModel に返します。ViewModel は、モデルのコレクションを ViewModel のコレクションに変換してリストに表示します。
現在、Rx を使用してこの種のロジックを実装する方法を探しています。ほとんどのページには、編集する 1 つのメイン リスト (アイテムの挿入、削除、変更) と、いくつかのサポート コレクション (値を選択するためのコンボ ボックスのプロバイダー) があります。サポート コレクションは、標準の async/await 呼び出しまたはタスクを Rx に変換することで簡単に取得できます。問題ありません。しかし、変更可能なリストは. Rx ロジックを壊すことなく、ページの存続期間全体にわたってこのリストの変更を追跡する方法がわかりません。購読するオプションがあります:
IEnumerable<Model>
Task<IEnumerable<Model>>
IObservable<IEnumerable<Model>>
IObservable<Model>
しかし、個々の変更を追跡する方法が必要なので、購読する必要があると思います。そして、Add、Delete、Edit などの他のメソッドからこのコレクションを変更する方法が必要です。では、 (または他の方法で)作成し、Manager 内のどこかに保存IObservable
して呼び出すか、または他の方法で保存する必要がありますか?しかし、それはRxの方法のようには見えません。私の問題について何か提案はありますか? アドバイスをいただければ幸いです。ありがとう。Observable.Create
IObserver
OnNext
OnError
PS: Rx は、イベントの無限のストリームではなく、自分で変更をプッシュする必要があるため、変更可能なリストの追跡に関する私の問題を解決するための最良の方法ではないと言うかもしれませんが、Rx にはデータをフィルタリングしてエラーを処理する非常に便利な方法があるので、アプリケーションに実装することを本当に楽しみにしています。
c# - リアクティブソケットとの接続/切断を繰り返すとメモリリークが発生する
GitHub ( https://github.com/clariuslabs/reactivesockets )で見つけたreactivesocketsライブラリを使用していますが、クライアントが接続および切断するときに誰かがメモリリークを経験したかどうか疑問に思っていました.
ライブラリで提供される ReactiveServer サンプルを実行しています。これは基本的に次のもので構成されています。
次に、ループを使用してコンソール アプリケーションを作成しました。このループ内で ReactiveClient が作成され、サーバーに接続され、200 ミリ秒待機してから切断されます。クライアントはサーバーにデータを送信しません。これはコードです:
Visual Studio のメモリ プロファイラーを見ると、テスト アプリケーションを実行すると、サーバーのメモリ使用量が直線的に増加することがわかります。テスト アプリケーションが停止しても、サーバーのメモリ使用量は低下しません。到達したレベルにとどまります。
StringChannel クラスをインスタンス化し、Receiver をサブスクライブするコードを使用せずに ReactiveServer サンプルを実行すると、メモリ リークは発生しないようです (または、メモリ使用量が直線的に増加するため、少なくともそれほど明確ではありません)。コードは次のようになります。
そのため、メモリ リークはprotocol.Receiver.Subscribe
、クライアントが切断されたときにプロトコルのレシーバーへのサブスクリプション (実行時) が適切に破棄されないことに関連している可能性があるのではないかと疑っていました。Rx.NET について詳しく読むと、通常の状況 (監視可能なシーケンスが終了した場合) では、サブスクリプションは自動的に破棄されますが、シーケンスが完了しない場合は破棄されないことを読みました。クライアントが切断されたときにシーケンスが完了せず、サブスクリプションが破棄されていないと思われます。Disconnected イベント ハンドラーでサブスクリプションを破棄しようとしましたが、目立った改善は見られませんでした。
アイデアや提案は大歓迎です。
ありがとう。