3

Observableアプリケーション間で共有されるタイマーを実装しようとしActivitiesています。私はそれぞれに注入するダガーシングルトンであるクラスで実装を行っていPresenterますActivity

そのように、Observable を一度作成します。

Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> this::doSomethingCool()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .share();

次の関数を使用して、プレゼンターからサブスクライブします。

public Observable<Status> register(Callback callback) {

    PublishSubject<Status> subject = PublishSubject.create();
    subject.subscribe(status -> {},
            throwable -> L.LOGE(TAG, throwable.getMessage()),
            () -> callback.onStatusChanged(mBasketStatus));

    mObservable.subscribe(subject);
    basketCounterCallback.onStatusChanged(status));

    subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
    return subject.asObservable();
}

私はそれをすべてのプレゼンターと Subject同じように保存し、次のように呼び出します: (メソッド 内で) サブスクライブを解除します。スケジューラーを使って退会しようとしましたObservableobs.unsubscribeOn(AndroidSchedulers.mainThread())onPause()Schedulers.immediate()

ただし、コールバックはとにかく X 回呼び出されるため (X は、タイマーにサブスクライブしたすべてのプレゼンターです)、サブスクライブ解除ではありません。また、ログ"Unsubcribed from subject!"が呼び出されていません。

すべてのサブジェクトから正しく購読を解除するにはどうすればよいですか?

前もって感謝します

編集:

コメントにより、実装の詳細が追加されました。

Singletonこれは、Observable を作成してクラスのメンバーに格納する部分ですStatusManager(ステータスもシングルトンです)。

private Observable<BasketStatus> mObservable;
private Status mStatus;

public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) {

    if (mObservable == null) mObservable = createObservable(milliseconds, status);

    return register(callback);
}

private Observable<BasketStatus> createObservable(long milliseconds, Status status) {

    mStatus = status;

    return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> status.upgradeStatus()))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .share();
}

public Observable<BasketStatus> register(Callback callback) {

    PublishSubject<Status> subject = PublishSubject.create();
    subject.subscribe(status -> {},
            throwable -> L.LOGE(TAG, throwable.getMessage()),
            () -> callback.onStatusChanged(mStatus));

    mObservable.subscribe(subject);
    callback.onStatusChanged(mStatus));

    subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
    return subject.asObservable();
}

タイマーを開始するメソッドを呼び出した後start(...)、次のプレゼンターからメソッドを呼び出します。Presenterregister(...)

class Presenter implements Callback {

    private Observable<BasketStatus> mRegister;

    @Inject
    public Presenter(Status status, StatusManager statusManager) {
        mRegister = statusManager.start(20000, status, this);
    }

    // Method called from onPause()
    public void unregisterFromBasketStatus() {
         mRegister.unsubscribeOn(Schedulers.immediate());
    }
}

そして次のプレゼンターは…

@Inject
public NextPresenter(StatusManager statusManager) {
    mBasketStatusManager.register(this);
}
4

1 に答える 1