rxpy
関数型リアクティブ プログラミング (FRP)のライブラリについて理解を深めようとしていますが、すでに障害にぶつかっています。標準入力 ( ) を介してデータがストリーミングされることを期待する小さなプログラムを作成していますsys.stdin
。
rx.Observable
したがって、私の質問は単純です。標準入力から非同期に読み取るインスタンスを作成するにはどうすればよいですか? Observable
ストリームからインスタンスを作成する組み込みメカニズムはありますか?
rxpy
関数型リアクティブ プログラミング (FRP)のライブラリについて理解を深めようとしていますが、すでに障害にぶつかっています。標準入力 ( ) を介してデータがストリーミングされることを期待する小さなプログラムを作成していますsys.stdin
。
rx.Observable
したがって、私の質問は単純です。標準入力から非同期に読み取るインスタンスを作成するにはどうすればよいですか? Observable
ストリームからインスタンスを作成する組み込みメカニズムはありますか?
を使ったことはありませんRxPy
が、少し慣れていRxJS
ます。
RxPy
この目的で使用できる組み込みメソッドが多数ありますが、私は Observable ファクトリを作成する傾向があります。私ObservableCreation.from_array
たちのガイドとして、今それを試してみましょう。(注: 私はこのコードを実行していませんが、ほとんどの場合は実行できるはずです)
from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler
class ObservableFile(Observable, metaclass=ObservableMeta):
@classmethod
def from_file(cls, readableFile, scheduler=None):
scheduler = scheduler or current_thread_scheduler
def subscribe(observer):
def action(action1, state=None):
try:
observer.on_next(readableFile.next())
action1(action)
except StopIteration: # EOF
observer.on_completed()
return scheduler.schedule_recursive(action)
return AnonymousObservable(subscribe)
次に、次のように使用します。
res = rx.Observable.from_file(sys.stdin)
これにより、EOF まで stdin の各行にオブザーバブルが作成されます。ブロックしていますが、それを回避する方法があります。また、別のスケジューラーで調整することもできます。
私はちょうど今日これで遊んでいて、
d = rx.Observable.from_(sys.stdin).subscribe(print)
動作しているように見えます (行を標準出力にエコーします)。from_
のエイリアスですfrom_iterable
。
d
購読を解除する使い捨てです。