1

いくつかのファイルをダウンロードする処理キューを RxJava に実装したいと考えています。ダウンロードしたいファイルの数は、最大で 100 程度です。

すべてが RxJava 1.1.1 を使用して Android で開発されています

私の現在の実装は次のようになります。

PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);

_subscription = _subject
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(_getObserver());  // Observer

_subject.onBackpressureBuffer(10000, new Action0() {
    @Override
    public void call() {
        Log.d(TAG, "onBackpressureBuffer called");
        return;
    }
});

// download a file
_subject.onNext(aValidURL);

Where_getObserver()は、「onNext」メソッドでファイルにダウンロードする新しいオブザーバー オブジェクトを返します。

しかし、私の問題は、MissingBackpreasureException理解できない をすぐに取得することです。を実装しようとしましたbackpreasurebufferが、呼び出されていないようです。

私は何を間違っていますか?

4

1 に答える 1

2

RxJava では、オペレーターを適用すると、動作が変更された新しい Observable インスタンスが取得されますが、元のインスタンスは同じままです。ここでは、 を呼び出しonBackpressureBufferました_subjectが、その結果を使用しないでください。それ以外の場合、呼び出しは何もしません。それを順番に適用する必要があります:

PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);

_subscription = _subject
                .onBackpressureBuffer(10000, new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "onBackpressureBuffer called");
                        return;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(_getObserver());  // Observer

// download a file
_subject.onNext(aValidURL);
于 2016-02-22T11:27:35.767 に答える