Schedulers.from(Executor)
それが機能するには、単一のスレッド化されたスケジューラが必要であり、経由または新規で取得できますSingleScheduler
(警告: 内部)。
問題unsubscribeOn
は、ダウンストリームが実際にストリームを破棄/キャンセルした場合にのみ実行されることです (これがほぼ常に発生する 1.x とは異なります)。
より良い方法はusing
、前述のカスタム スケジューラを使用して、クリーンアップを手動でスケジュールすることです。
Observable<T> createObservable(Scheduler scheduler) {
return Observable.create(s -> {
Resource res = ...
s.setCancellable(() ->
scheduler.scheduleDirect(() ->
res.close() // may need try-catch here
)
);
s.onNext(...);
s.onComplete();
}).subscribeOn(scheduler);
}
Scheduler scheduler = new SingleScheduler();
createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);
ただし、create
ロジックが (終了または非同期になることによって) スケジューラを放棄しない限り、同じプールのライブロックが原因でキャンセル ロジックが実行されない可能性があることに注意してください。