0

Observable<String>いくつかの作業を実行する があります。完了後、接続を閉じます (経由で) .setDisposable(Disposables.fromAction(logic*);。問題は、このクローズ アクションを実際のワークロードと同じスケジューラで実行する必要があることです。

出来ますか?どのように?

Observable を特定のスケジューラで実行するように強制したく myObservable.subscribeOn(x).unsubscribeOn(x);ありません。

4

1 に答える 1

4

Schedulers.from(Executor)それが機能するには、単一のスレッド化されたスケジューラが必要であり、経由または新規で取得できますSingleScheduler(警告: 内部)。

問題unsubscribeOnは、ダウンストリームが実際にストリームを破棄/キャンセルした場合にのみ実行されることです (これがほぼ常に発生する 1.x とは異なります)。

より良い方法はusing、前述のカスタム スケジューラを使用して、クリーンアップを手動でスケジュールすることです。

Observable<T> createObservable(Scheduler scheduler) {
    return Observable.create(s -> {
        Resource res = ...

        s.setCancellable(() -> 
            scheduler.scheduleDirect(() ->
                res.close() // may need try-catch here
            )
        );

        s.onNext(...);
        s.onComplete();
    }).subscribeOn(scheduler);
}

Scheduler scheduler = new SingleScheduler();

createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);

ただし、createロジックが (終了または非同期になることによって) スケジューラを放棄しない限り、同じプールのライブロックが原因でキャンセル ロジックが実行されない可能性があることに注意してください。

于 2016-11-09T14:51:59.203 に答える