0

(B)が新しい を持つたびにそのアイテムを放出する( Observable<Item> A)があります。PublishSubject<Item> Item

Aは次のように使用されます。A.subscribeOn(computationScheduler).observeOn(mainThread)


目標: Aからの完全なストリームを計算スケジューラで実行し、メインスレッドで結果を消費します。

実際: Bが観察される場所に応じて、ストリーム全体が異なるスケジューラで実行されます。.subscribeOn()以下の例では、呼び出しを介してもメインスレッドで。

A からの完全なストリームが特定のスケジューラでその作業を実行し、別のスケジューラで結果をディスパッチするにはどうすればよいですか? A.compose() はうまくいきません。


実際のコード:

class SomeClass

private final PublishSubject<ExerciseQueryOptions> queryOptionsPublishSubject = PublishSubject.create();

@NonNull
@Override
public Observable<List<ExerciseViewModel>> call() {
    return queryOptionsPublishSubject
            .startWith(createQueryOptions())
            .distinctUntilChanged()
            .flatMap(new Function<ExerciseQueryOptions, ObservableSource<List<ExerciseViewModel>>>() {
                @Override
                public ObservableSource<List<ExerciseViewModel>> apply(ExerciseQueryOptions queryOptions) throws Exception {
                    //This is the code I want to run on a given scheduler,
                    //Supplied by the code that calls this .call() method.
                    return datastore.queryAll(ExerciseModel.class, true)
                            .map(transformer);                      
                }
            });
}

//Other class
SomeClass A; 

A.subscribeOn(Schedulers.computation()).observeOn(AndroidScheduers.mainThread())
.subcribe(...);
4

1 に答える 1