現在、呼び出し可能なオブジェクトを rxpy オブザーバブルにラップして、新しい値を処理する必要があることをオブザーバーに通知しようとしていますが、これを簡単に行う方法がわかりません。比較的経験豊富な C++/Qt 開発者です。
yield
キーワードを使用して、次のようなサンプルを機能させることができましたが、これはほとんどの状況ではあまり便利ではありません。
import subprocess
import rx
def run_ping():
p = subprocess.Popen(["ping", "8.8.8.8"], stdout=subprocess.PIPE, universal_newlines=True)
for line in iter(p.stdout.readline, ""):
yield line
source = rx.from_iterable(run_ping())
source.subscribe(lambda s : print(f"!! {s}"))
しかし、次の例を取ると:
import subprocess
import rx
class MyObservable:
def __init__(self, my_list: list):
self.i = 0
self.l = my_list
def __call__(self, **kwargs):
obs = kwargs.get('obs', None)
scheduler = kwargs.get('scheduler', None)
i = self.i
self.i = self.i + 1
if obs is None:
return self.l[i]
if i == len(self.l):
obs.on_next(-1)
else:
obs.on_next(self.l[i])
obs.on_completed()
o = MyObservable([1, 2, 3, 4])
source = rx.from_callable(o)
source.subscribe(lambda s : print(f"!! {s}"))
!! 1
私は出力としてのみ取得し2
、 処理3
さ4
れることはなく、オブザーバーにそれらを処理させる方法が実際にはわかりませんが、さらに、MyObservable
リストに値を追加するメソッドを実装する方法がわかりません。オブザーバーでの処理をトリガーします。
Observable
から何か起動できるのではないかと思い、 の API を調べてみたのsource
ですが、そうではないようです。
Qt と同じロジックを適用しようとしてrx
/の哲学に明らかな何かが欠けているのでしょうか、それともそれを行う簡単な方法がありますか、それとも私の問題を解決するには、私のニーズに合った特殊化を実装する必要がありますか? ?rxpy
Observable
とにかく、答えてくれてありがとう!
EDIT :私は完全に私を満足させない方法を見つけました.anとanのSubject
両方であるクラスを使用して、そのメソッドを使用して、新しいデータがここにあることを外部サブスクライバーに通知できます:Observable
Observer
on_next
import subprocess
import rx
from rx.subject import Subject
class MyBetterObservable:
def __init__(self):
self.i = 0
self.l = []
self.subject = Subject()
def subscribe(self, f):
self.subject.subscribe(f)
def append(self, i):
self.l.append(i)
self.subject.on_next(i)
o = MyBetterObservable()
o.subscribe(lambda s : print(f"!! {s}"))
o.append(1)
o.append(17)
このサンプルは、私が期待するものを示しています:
!! 1
!! 17
しかし、これは間違いなく私たちがこの種のことをすることを期待する方法ですか、それともオブジェクトrxpy
をラップするよりもエレガントな方法はありますか?Subject