3

リアクティブストリームを操作しながら、閉じる必要があるさまざまなリソースを使用してアプリケーションに取り組んでいます。

オブジェクトへの参照を保持する flyweight パターンに基づくファクトリがあり、AutoCloseable インターフェイスを実装しています。問題は、Autocloseable クラス内で close() を使用していることです。これが私の質問です: 工場内の閉じたリソースへの参照を削除するための最良の解決策は何ですか? ある種のイベントをスローしてファクトリでキャッチできますか、またはリソースを閉じることができるすべてのアクションの後に、参照マップを反復処理して閉じたリソースを削除する必要がありますか?

より良いコンテキストのために:ディレクトリイベント(ファイル/ディレクトリの作成、削除)を発行するreactivex Observableを使用しています。すべてのサブスクライバーがサブスクライブを解除した後、使用しているWatchServiceを閉じています。

編集#1

ここで、私のファクトリ クラスは次のようになります。

public final class Factory {

    private final ConcurrentHashMap<String, ReactiveStream> reactiveStreams = new ConcurrentHashMap<>();

    public ReactiveStream getReactiveStream(Path path) throws IOException {

        ReactiveStream stream = reactiveStreams.get(path.toString());

        if (stream != null) return stream;

        stream = new ReactiveStream(path);
        reactiveStreams.put(path.toString(), stream);

        return stream;

    }

}

そして、これが私の ReactiveStream クラスがどのように見えるかです:

public class ReactiveStream implements AutoCloseable {

    (...)
    private WatchService service;
    private Observable<Event> observable;

    public Observable<Event> getObservable() throws IOException {

        (...) // where i create observable

        return observable;
    }

    (...)

    @Override
    public void close() throws IOException {
        service.close();
    }
}

ご覧のとおり、ReactiveStream クラスへの参照を保持するファクトリがあります。これは、観察可能になった後に閉じられ、サブスクライブされなくなります (共有を使用する前に doOnUnsubscribe(() -> close()) を使用する方法で行いました)。 () 監視可能であるため、サブスクライバーが存在しない場合、doOnUnsubscribe が呼び出されます)。

私の質問は、Factory から閉じた ReactiveStream への参照を閉じた後に削除するにはどうすればよいですか?

編集#2

observable = Observable.fromCallable(new EventObtainer()).flatMap(Observable::from).subscribeOn(Schedulers.io()).repeat().doOnUnsubscribe(() -> {
                try {
                    close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).share();

オブザーバブルを作成する方法は次のとおりです。EventObtainer は、各サブスクライバーがサブスクライブを停止した後に閉じる必要がある WatchService を使用する ReactiveStream のネストされたクラスです。

4

1 に答える 1