23

Android アプリのデータを更新するために、RESTful エンドポイントを定期的にポーリングする必要があります。また、接続に基づいて一時停止して再開する必要があります (電話がオフラインの場合は、試す必要さえありません)。私の現在のソリューションは機能していますがScheduledExecutorService、定期的なタスクを実行するために標準の Java を使用していますが、Rx パラダイムにとどまりたいと考えています。

これが私の現在のコードです。簡潔にするために一部を省略しています。

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
    @Override
    public void call(final Subscriber<? super UserProfile> subscriber) {
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // making http request here
            }
        };
        final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
        networkStatusObservable.subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean networkAvailable) {
                if (!networkAvailable) {
                    pause();
                } else {
                    pause();                        
                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
                }
            }

            private void pause() {
                for (ScheduledFuture<?> future : futures) {
                    future.cancel(true);
                }
                futures.clear();
            }
        });

        final Subscription subscription = new Subscription() {
            private boolean isUnsubscribed = false;

            @Override
            public void unsubscribe() {
                scheduledExecutorService.shutdownNow();
                isUnsubscribed = true;
            }

            @Override
            public boolean isUnsubscribed() {
                return isUnsubscribed;
            }
        };
        subscriber.add(subscription);
    }
}).multicast(BehaviorSubject.create()).refCount();

networkStatusObservableは基本的に にラップされたブロードキャスト レシーバObservable<Boolean>であり、電話がネットワークに接続されていることを示します。

私が言ったように、このソリューションは機能していますが、定期的なポーリングと新しいUserProfiles の発行には Rx アプローチを使用したいと考えています。Observable.timerとについては知ってObservable.intervalいますが、それらをこのタスクに適用する方法がわかりません (そして、それらを使用する必要があるかどうかはわかりません)。

4

5 に答える 5

29

この GitHub の問題には、役立つと思われるアプローチがいくつかあります。

https://github.com/ReactiveX/RxJava/issues/448

3 つの実装は次のとおりです。


Observable.interval

Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
        .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
            public Observable<Notification<AppState>> call(Long seconds) {
                return lyftApi.updateAppState(params).materialize(); } });

Scheduler.schedulePeriodically

Observable.create({ observer ->
        Schedulers.newThread().schedulePeriodically({
            observer.onNext("application-state-from-network");
        }, 0, 1000, TimeUnit.MILLISECONDS);
    }).take(10).subscribe({ v -> println(v) });

手動再帰

Observable.create(new OnSubscribeFunc<String>() {
        @Override
        public Subscription onSubscribe(final Observer<? super String> o) {
            return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
                @Override
                public Subscription call(Scheduler inner, Long t2) {
                    o.onNext("data-from-polling");
                    return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<String>() {
        @Override
        public void call(String v) {
            System.out.println("output: " + v);
        }
    });

結論として、次の実行をスケジュールする前に操作が完了するまで待機するため、手動再帰が適しているということです。

于 2015-02-10T19:22:44.397 に答える
1

さて、私は自分の解決策を投稿します。おそらく誰かがそれから恩恵を受けるでしょう。質問に関連する部分のみを投稿し、HTTP とキャッシュに関するものは省略します。これが私がそれを行う方法です:

private ConnectableObservable<Long> createNetworkBoundHeartbeatObservable(final Observable<Boolean> networkStatusObservable,
                                                                          final Observable<Boolean> pauseResumeObservable) {

    final Observable<Boolean> pausableHeartbeatObservable = Observable.combineLatest(networkStatusObservable, pauseResumeObservable,
            new Func2<Boolean, Boolean, Boolean>() {
                @Override
                public Boolean call(Boolean networkAvailable, Boolean mustPause) {
                    return mustPause && networkAvailable;
                }
            }
    ).distinctUntilChanged();

    final Observable<Boolean> hasToResumeObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
        @Override
        public Boolean call(Boolean networkAvailable) {
            return networkAvailable;
        }
    });
    final Observable<Boolean> hasToStopObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
        @Override
        public Boolean call(Boolean networkAvailable) {
            return !networkAvailable;
        }
    });


    return pausableHeartbeatObservable.concatMap(new Func1<Boolean, Observable<Long>>() {
        @Override
        public Observable<Long> call(Boolean shouldResumeRequests) {
            if (shouldResumeRequests) {
                long timeToUpdate;
                final Date oldestModifiedExpiresAt = cache.oldestModifiedExpiresAt();
                timeToUpdate = Math.max(0, oldestModifiedExpiresAt.getTime() - System.currentTimeMillis());
                Log.d(TAG, String.format("Have to restart updates, %d seconds till next update", timeToUpdate / SECOND_IN_MILLIS));
                return Observable
                        .timer(timeToUpdate, SECONDS_TO_EXPIRE * SECOND_IN_MILLIS, TimeUnit.MILLISECONDS)
                        .takeUntil(hasToStopObservable);
            } else {
                Log.d(TAG, "Have to pause updates");
                return Observable.<Long>never().takeUntil(hasToResumeObservable);
            }
        }
    }).multicast(PublishSubject.<Long>create());
}

ご覧のとおり、更新を一時停止または再開する条件はもう少し複雑になり、アプリがバックグラウンドに移行したときの一時停止をサポートするために新しい Observable が追加されています。

次に、ソリューションの中心にあるのconcatMapは、順次発行する操作ですObservables(したがって、flatMap ではなく concatMap です。この質問を参照してください: RxJava での concatMap と flatMap の違いは何ですか)。更新を続行するか一時停止するかによって、 または のintervalいずれかを出力します。never Observables次に、放出されるすべてのObservableものがtakenUntil「反対」であり、Observable新しい値を放出します。

ConnectableObservableObservable作成されたものがホットであり、対象のすべてのサブスクライバーが何かを発行し始める前にそれをサブスクライブする必要があるため、返されます。そうしないと、最初のイベントが失われる可能性があります。connect私は後でそれを呼び出します。

もしあれば、投票に基づいて私または別の回答を受け入れます。

于 2014-07-23T10:17:55.580 に答える