14

rxjava を使用してサンプルをビルドしようとしています。このサンプルは、 ReactiveWareService と ReactiveReviewService を調整して WareAndReview コンポジットを返す必要があります。

ReactiveWareService
        public Observable<Ware> findWares() {
        return Observable.from(wareService.findWares());
    }

ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency!

public Observable<Review> findReviewsByItem(final String item) {
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> {
    try {
        List<Review> reviews = reviewService.findReviewsByItem(item);
        reviews.forEach(observer::onNext);
        observer.onCompleted();
    } catch (Exception e) {
        observer.onError(e);
    }
}));
}

public List<WareAndReview> findWaresWithReviews() throws RuntimeException {
final List<WareAndReview> wareAndReviews = new ArrayList<>();

wareService.findWares()
    .map(WareAndReview::new)
.subscribe(wr -> {
        wareAndReviews.add(wr);
        //Async!!!!
        reviewService.findReviewsByItem(wr.getWare().getItem())
            .subscribe(wr::addReview,
                throwable -> System.out.println("Error while trying to find reviews for " + wr)
            );
    }
);

//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion!
try {
    Thread.sleep(3000);
} catch (InterruptedException e) {}

return wareAndReviews;
}

Observable を返したくないという事実を考えると、非同期 Observable (findReviewsByItem) が完了するのをどのように待つことができますか?

4

3 に答える 3

18

例のほとんどは、うまく連携する標準の RxJava 演算子で書き直すことができます。

public class Example {

    Scheduler scheduler = Schedulers.from(executor);

    public Observable<Review> findReviewsByItem(final String item) {
        return Observable.just(item)
               .subscribeOn(scheduler)
               .flatMapIterable(reviewService::findReviewsByItem);
    }
    public List<WareAndReview> findWaresWithReviews() {
        return wareService
               .findWares()
               .map(WareAndReview::new)
               .flatMap(wr -> {
                   return reviewService
                          .findReviewsByItem(wr.getWare().getItem())
                          .doOnNext(wr::addReview)
                          .lastOrDefault(null)
                          .map(v -> wr);
               })
               .toList()
               .toBlocking()
               .first();
    }
}

このようなサービスを構成したいときはいつでも、flatMap最初に考えてください。sub-Observable ごとにブロックする必要はありませんがtoBlocking()、本当に必要な場合は最後にのみブロックしてください。

于 2015-08-13T21:07:03.897 に答える
13

BlockingObservable のメソッドを使用できますhttps://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operatorsを参照してください。例えば

BlockingObservable.from(reviewService.findReviewsByItem(..)).toIterable()
于 2014-06-23T13:33:25.203 に答える
-5

別の方法は、開始する前に CountdownLatch を宣言することです。次に、onCompleted() のそのラッチで countDown() を呼び出します。その後、そのラッチで Thread.sleep() を await() に置き換えることができます。

于 2015-08-12T22:37:11.960 に答える