5

rxpy関数型リアクティブ プログラミング (FRP)のライブラリについて理解を深めようとしていますが、すでに障害にぶつかっています。標準入力 ( ) を介してデータがストリーミングされることを期待する小さなプログラムを作成していますsys.stdin

rx.Observableしたがって、私の質問は単純です。標準入力から非同期に読み取るインスタンスを作成するにはどうすればよいですか? Observableストリームからインスタンスを作成する組み込みメカニズムはありますか?

4

2 に答える 2

4

を使ったことはありませんRxPyが、少し慣れていRxJSます。

RxPyこの目的で使用できる組み込みメソッドが多数ありますが、私は Observable ファクトリを作成する傾向があります。私ObservableCreation.from_arrayたちのガイドとして、今それを試してみましょう。(注: 私はこのコードを実行していませんが、ほとんどの場合は実行できるはずです)

from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler

class ObservableFile(Observable, metaclass=ObservableMeta):

    @classmethod
    def from_file(cls, readableFile, scheduler=None):
        scheduler = scheduler or current_thread_scheduler

        def subscribe(observer):
            def action(action1, state=None):
                try:
                    observer.on_next(readableFile.next())
                    action1(action)

                except StopIteration: # EOF
                    observer.on_completed()

            return scheduler.schedule_recursive(action)
        return AnonymousObservable(subscribe)

次に、次のように使用します。

res = rx.Observable.from_file(sys.stdin)

これにより、EOF まで stdin の各行にオブザーバブルが作成されます。ブロックしていますが、それを回避する方法があります。また、別のスケジューラーで調整することもできます。

于 2014-07-31T06:37:18.183 に答える
3

私はちょうど今日これで遊んでいて、

 d = rx.Observable.from_(sys.stdin).subscribe(print)

動作しているように見えます (行を標準出力にエコーします)。from_のエイリアスですfrom_iterabled購読を解除する使い捨てです。

于 2016-03-22T21:11:41.800 に答える