11

リアクティブ プログラミングと RxJava を調査しています。楽しいのですが、答えが見つからない問題に行き詰まっています。私の基本的な質問: それ以外の場合は無限に実行されている Observable を終了するための反応に適した方法は何ですか? また、私のコードに関する批評や反応的なベスト プラクティスも歓迎します。

演習として、ログ ファイルの末尾のユーティリティを作成しています。ログ ファイル内の行の流れは、Observable<String>. ファイルに追加されたテキストの読み取りを続行するBufferedReaderには、通常のreader.readLine() == null終了チェックを無視し、代わりに、スレッドがスリープしてロガー テキストを待つ必要があることを意味すると解釈します。

しかし、 を使用してオブザーバーを終了することはできますがtakeUntil、それ以外の場合は無限に実行されるファイル ウォッチャーを終了するクリーンな方法を見つける必要があります。私は自分のterminateWatcherメソッド/フィールドを書くことができますが、それは Observable/Observer のカプセル化を壊します - そして私は可能な限りリアクティブパラダイムに厳密にとどまりたいです.

Observable<String>コードは次のとおりです。

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

以下は、新しい行が来るたびに出力する Observer コードです。

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

私の2つの質問は次のとおりです。

  1. そうでなければ無限に実行されているストリームを終了するための反応的で一貫した方法は何ですか?
  2. 私のコードであなたを泣かせる他の間違いは何ですか? :)
4

1 に答える 1

6

Subscriptionサブスクリプションが要求されたときにファイル監視を開始しているため、サブスクリプションが終了したとき、つまりが閉じられたときに終了するのが理にかなっています。これは、コード コメントのルーズ エンドの 1 つに対処します。ファイルの監視を停止するようSubscriptionに に指示する a を返します。FileWatcher次に、現在のスレッドが中断されたかどうかをチェックする代わりに、サブスクリプションがキャンセルされたかどうかをチェックするようにループ条件を置き換えます。

onSubscribe()問題は、メソッドが返されない場合、それはあまり役に立たないことです。おそらくFileWatcher、スケジューラを指定する必要があり、そのスケジューラで読み取りが行われます。

于 2013-12-18T18:48:39.497 に答える