問題タブ [rx-py]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - asyncio を使用して別のスレッドで RxPY オブザーバーに通知するにはどうすればよいですか?
(注: この問題の背景はかなり詳細ですが、スキップできる SSCCE が一番下にあります)
バックグラウンド
Web サービスとやり取りするための Python ベースの CLI を開発しようとしています。私のコードベースにはCommunicationService
、Web サービスとのすべての直接通信を処理するクラスがあります。Web サービスから応答が返されたときに通知を受けるために、他のオブジェクトがサブスクライブできる (RxPY からの) をreceived_response
返すプロパティを公開します。Observable
click
サブコマンドの 1 つが以下のように実装されているライブラリに基づいて CLI ロジックを作成しました。
ここで何が起こっているか (そうでresponse_handler
ない場合) は、サブコマンドが Web サービス ( ) からの応答を待機し、処理できる最初の応答から処理された値を返すNone
コルーチンとして動作していることです。self.on_response == CommunicationService.received_response
CommunicationService
完全にモックされたテストケースを作成して、CLI の動作をテストしようとしています。偽物Subject
が作成され( として機能できますObservable
)、CommunicationService.received_response
モックされて返されます。テストの一環として、サブジェクトのon_next
メソッドが呼び出され、モック Web サービスの応答が本番コードに返されます。
CLI 呼び出しの最後に呼び出され、コルーチン (サブコマンド) が完了するまでブロックするクリックの「結果コールバック」関数を使用します。
問題
テストの開始時に、CliRunner.invoke
シバン全体を発射するために走ります。問題は、これがブロッキング呼び出しであり、CLI が終了して結果を返すまでスレッドをブロックすることです。これは、テスト スレッドを続行する必要がある場合には役に立ちません。これにより、テスト スレッドと同時に Web サービスの応答のモックを生成できます。
私がする必要があると思うのはCliRunner.invoke
、を使用して新しいスレッドで実行することThreadPoolExecutor
です。これにより、テスト ロジックを元のスレッドで続行し、@when
上記のステップを実行できます。ただし、で発行された通知は mock_received_response_subject.on_next
、実行をトリガーしてサブコマンド内で続行するようには見えません。
解決策には RxPY のAsyncIOScheduler
.
SSCCE
以下のスニペットは、問題の本質であることを願っています。動作するように変更できる場合は、同じソリューションを実際のコードに適用して、希望どおりに動作させることができるはずです。
現在の動作
実行時にプログラムがハングし、 でブロックされresult = loop.run_until_complete(task)
ます。
合否基準
プログラムが終了し、 に出力foo
されstdout
ます。
更新 1
Vincent の助けに基づいて、コードにいくつかの変更を加えました。
Relay.enabled
(Web サービスからの応答を処理するために待機するサブコマンド) は、次のように実装されています。
await
オブザーバブルでどのように動作するかはよくわかりませんでしたRxPY
-生成された各要素で呼び出し元に実行を返すか、オブザーバブルが完了した(またはエラーになった?)場合にのみ実行します。正直なところ、後者の方がより自然な選択のように感じられ、この関数の実装をよりエレガントで反応的に感じさせることができました。
モック Web サービス応答を生成するテスト ステップも変更しました。
残念ながら、CLI は独自のスレッドで呼び出されているため、これはそのままでは機能しません...
また、CLI は、呼び出されると独自のスレッド プライベート イベント ループを作成します...
私が必要だと思うのは、テスト ステップが新しいスレッドで CLI を呼び出し、使用しているイベント ループをフェッチできるようにする方法です。
更新 2
特定のスレッドがそれ自体で作成して使用するイベント ループを取得する簡単な方法はないようです。そのため、代わりに、Victor のアドバイスを受けasyncio.new_event_loop
て、テスト コードが作成して保存するイベント ループを返すようにモックしました。
'mock web response received' テスト ステップを次のように変更します。
Relay.enabled
素晴らしいニュースは、このステップが実行されると、実際にコルーチンがトリガーされることです!
現在の唯一の問題は、CLI を独自のスレッドで実行することで得られる将来を待ち、CLI がこれを送信していることを検証する最終テスト ステップですstdout
。
私はこれをいじってみましたが、うまく移行して結果を返すことができないcontext.async_result
ようloop.run_in_executor
ですdone
。現在の実装では、最初のテスト ( ) でエラーが発生し、2 番目のテスト ( 1.1
) で無期限にハングし1.2
ます。
第3章:フィナーレ
このすべての非同期マルチスレッドのものを台無しにしてください。私はそれにはあまりにも愚かです。
まず、このようなシナリオを説明する代わりに...
次のように説明します。
新しい特定のステップを実装します。
ブーム
python - Reactive 拡張機能の create で repeat(n) が機能しない理由
与える
74 78 94 59 79 76
シーケンス、各数字が6回繰り返されることを期待していますが
そのため、create メソッドで作成されたオブザーバブルに対して「繰り返し」が機能することはありません。
python-3.x - オブジェクトのプロパティが RxPy で変更されたときに関数をトリガーする
RxPy では、ここでINotifyPropertyChanged
言及されている .NET フレームワークに似たものはありますか? オブジェクトにオブザーバーを追加しようとしているため、オブジェクトのプロパティが変更され、関数が呼び出されます。
python - buffer_with_count の動作は、オブザーバブルの間隔とオブザーバブルの範囲では異なります。なんで?
RxPyを試していますが、buffer_with_count
オペレーターのこの動作を理解していません。
シナリオ 1: インターバル オブザーバブル
これは私が期待するように動作します。
バッファなし
バッファあり (count=3)
シナリオ 2: 観測可能な範囲
これは、期待されるバッファリングされた出力を生成しません
バッファなし
バッファあり (count=3)
この 2 番目のシナリオでは何が起こっていますか?
どうもありがとう!
python - Rx - リアルタイムでの株価変動のトリガー?
Observable
クラスの例として、特定の株式ティッカーの現在の価格を発行するホットを作成する最も堅牢な方法を見つけようとしていますが、変更されたときにのみ価格を発行します。私が自分で思いついた最善の方法は、 Observable
Google Finance に 3 秒ごとにクエリを実行し、価格を解析し、値が を介して変更された場合にのみそれをプッシュする間隔ソースを作成することですdistinct_until_changed()
。
これを行うためのより効率的な方法を知っている人はいますか?間隔を空けて再クエリする必要はありませんが、価格が実際に変化したときにソースをトリガーしますか? 非常に複雑で特殊なライブラリが必要な場合は、私が持っているものを使い続けます。これを行うためのはるかに良い方法が欠けているように感じます...
python - RxPy オブザーバブルからの pyqtgraph プロットの更新
次のスクリプトでは、プロット ウィンドウが作成され、values
正しく に渡されplot.update
ます。ただし、プロットは更新されません。私は何を間違っていますか?