3

rx Java でバックプレッシャーを使用して、Android アプリケーションで無限スクロールを作成しようとしています。外部サービスを要求された回数だけ呼び出すようにしたい(呼び出した後request(1))。しかし、flatmap を使用した後は、すべてsubscribe16 ページが読み込まれます。

私のコードの下に期待される結果があります。最初のリクエストが原因でほぼすべてのテストが失敗します (n=16 の場合)

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;

public class ServiceObservablesTest {


    public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
        Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
            AtomicInteger pageNumber = new AtomicInteger();
            subscriber.setProducer(n -> {
                // at subscribe rxJava makes request for 16 elements - probably because of flatMap
                // after first request with 16 elements everything seems to work fine even if i ignore the 'n' param

                Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
                subscriber.onNext(page);

            });
        });
        return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
    }

    public interface DataProvider<T> {
        Observable<List<T>> requestPage(int page);
    }


    private DataProvider provider;

    @Before
    public void setUp() throws Exception {
        provider = Mockito.mock(DataProvider.class);
        List<Object> list = Arrays.asList(new Object());
        when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
    }

    @Test
    public void shouldRequestOnlyFirstPageOnSubscribe() {
        //given

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);

        //then
        subscriber.assertValueCount(1);
        subscriber.assertNotCompleted();

        verify(provider, times(1)).requestPage(0);
        verify(provider, never()).requestPage(1);
    }


    @Test
    public void shouldRequestNumberOfPagesSpecified() {
        //given

        int requested_pages = 5;
        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(requested_pages);

        //then
        subscriber.assertValueCount(requested_pages);
        subscriber.assertNotCompleted();


        for (int i = 0; i < requested_pages; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(requested_pages);

    }


    @Test
    public void shouldCompleteAfterRetrievingEmptyResult() {
        //given

        int emptyPage = 2;
        when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
        Observable<List<Object>> flightsObservable = create(provider);


        //when
        flightsObservable.subscribe(subscriber);

        //then
        subscriber.assertValueCount(emptyPage);
        subscriber.assertCompleted();


        verify(provider, times(1)).requestPage(0); //requested at subscribe
        for (int i = 1; i <= emptyPage; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(emptyPage + 1);

    }

    @Test
    public void shouldRequestNextPageWhenRequestedMore() {
        //given

        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(1);

        //then
        subscriber.assertValueCount(2);
        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);

        //when
        subscriber.requestMore(1);

        //then
        subscriber.assertValueCount(3);
        subscriber.assertNotCompleted();

        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, times(1)).requestPage(2);
        verify(provider, never()).requestPage(3);

    }

    @Test
    public void shouldWorkWithMultipleSubscribers() {

        //given

        TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
        TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber1);
        flightsObservable.subscribe(subscriber2);

        //then
        subscriber1.assertValueCount(1);
        subscriber2.assertValueCount(1);

        verify(provider, times(2)).requestPage(0);
        verify(provider, never()).requestPage(1);

        //when
        subscriber1.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(1);

        verify(provider, times(2)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);

        //when
        subscriber2.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(2);

        verify(provider, times(2)).requestPage(0);
        verify(provider, times(2)).requestPage(1);
        verify(provider, never()).requestPage(2);
    }

}
4

1 に答える 1

3

バック プレッシャは、コンシューマ プロデューサーの同時動作をネゴシエートし、生​​成されるデータのレートが消費されるデータのレートを超えた場合に何をすべきかを解決するための戦略をプログラム作成者が設定できるようにすることを目的としています。

そうは言っても、 などのオブザーバブルを結合するオペレーターは、merge必要なデータ量に対応しない要求量を提供することがわかります。外側のオブザーバブル (オブザーバブルのオブザーバブル) は、マージ時に RxAndroid では常に 16 (RxJava では 128) のリクエストを受け取ります。次に、List の内部 Observable を受け取ると、各内部 Observable は、ダウンストリーム サブスクライバーから要求された量に基づく要求を受け取ります。を記述しようとすると、マージ動作を内部的に管理する関数Observable<Observable<T>>を記述して、 の代わりに を使用する必要があります。これを記述すると、データ プロバイダーから返されたオブザーバブルをサブスクライブして、.OnSubscribe<Observable<List<T>>>Observable<List<T>>Observable<Observable<List<T>>List<T>

代わりに、画面の y 位置を End-Of-Page イベントにマップし、スキャンを使用してそれを単調に増加する数値に変換し、その数値を concatMap への呼び出しに変換することをお勧めしますDataProvider.requestPage()

screenYPositions
    .map(this::isUninitializedOrNearEndOfPage)
    .scan(1, (event, pageNumber) -> pageNumber + 1 )
    .concatMap(dataProvider::requestPage)
    .subscribe(testSubscriber);
于 2015-08-12T18:54:38.033 に答える