6

ボタンが押されるたびにブール値を発行するプロセッサがあるとしましょう。これをトグルと考えてください。

    boolean gateValue = true;
    PublishProcessor<Boolean> gate = PublishProcessor.create();
    view.onButtonClicked()
            .subscribe(new Action1<Void>() {
                @Override
                public void call(final Void aVoid) {
                    gate.onNext(gateValue = !gateValue);
                }
            }));

私がやりたいことは、ゲートの値を使用して監視可能なシーケンスを一時停止および再開し、一時停止中に放出された値をバッファリングすることです。

私はこれをよく読みましたが、他の言語のリアクティブ拡張機能では可能だと思われますが、RxJava はそれをサポートしていないようです。

これは私が達成したいことの例です。毎秒増分値を出力するだけです。ボタンを押すと、もう一度押すまで出力を停止し、2 つのボタンを押す間に発行されたすべてのアイテムを出力する必要があります。

Flowable.interval(1, TimeUnit.SECONDS)
                .bufferWhile(gate)
                .flatMapIterable(longs -> longs)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(final Long aLong) throws Exception {
                        view.displayTime(aLong);
                    }
                });

このようなことを達成する方法を知っている人はいますか?

編集これを達成する方法についてブログ投稿を書きましたhttps://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

4

3 に答える 3

6

valve()RxJava2Extensions ライブラリに、要求された動作を実行するオペレーターが追加されました。

于 2016-11-16T21:26:41.730 に答える
1

これは、デフォルトの RxJava オペレーターを使用して実現できます。

    final Random random = new Random();
    final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
            .map(it -> random.nextBoolean())
            .startWith(false)
            .doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
            .take(100);
    final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
            .doOnNext(it -> System.out.println("New value " + it))
            .take(100);

    gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
            innerData.window(
                    innerGates.distinctUntilChanged().filter(it -> it),
                    (ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
            ).flatMap(it -> it),
            innerData.buffer(
                    innerGates.distinctUntilChanged().filter(it -> !it),
                    (ignore -> innerGates.distinctUntilChanged().filter(it -> it))
            )
                    .flatMap(Observable::from)
    )))
            .subscribe(it -> System.out.println("On next " + it));
于 2018-04-06T16:31:24.297 に答える