0

サーバーとして Spring リアクティブを使用して、高価な生成を行い、結果を Flux で 1 つずつ返します。これには、リクエストがキャンセルされた場合に生成を停止できるという利点があります (制約がきつすぎる場合など)。私のコードは次のようになります。

    public Flux<Entity> generate(int nbrOfEntitiesToGenerate, Constaints constraints) {
        return Flux.range(0, nbrOfEntitiesToGenerate)
            .map(x -> Generator.expensiveGeneration(constraints)
//            .subscribeOn(Schedulers.parallel())
            ;
    }

これは私が望むことの半分しか行いません。expensiveGenerationキャンセルされたときに次の呼び出しを行いませんが、現在実行中の高価な生成を停止しません。制約が厳しすぎると終了しない可能性があります。どうすればそれができますか。

余分な質問ですが、x エンティティを並列に生成して、スレッドを最大限に活用するにはどうすればよいでしょうか (もちろん、すべての世代を一度に開始する必要はありません)。

前もって感謝します。

4

1 に答える 1

0

Scheduleraから aを作成するのExecutorServiceは簡単ですがFuture<?>、キャンセルしたい場合は callable を保存する必要があります。Generatorを保持してメソッドをラップするように変更しました。このメソッドは、ハンドルcancelのときに呼び出されます。FluxdoOnCancel

public class FluxPlay {
    public static void main(String[] args) {
        new FluxPlay().run();
    }
    private void run() {
        Flux<LocalDateTime> f = generate(10);
        Disposable d = f.subscribeOn(Schedulers.single()).subscribe(System.out::println);
        try {
            Thread.sleep(4500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        d.dispose();
    }

    private Flux<LocalDateTime> generate(int nbrOfEntitiesToGenerate) {
        Generator generator = new Generator();
        return Flux.range(0, nbrOfEntitiesToGenerate)
        .map(x -> generator.expensiveGeneration())
        .doOnCancel(generator::cancel)
        .doFinally(generator::shutdown)
        .publishOn(Schedulers.fromExecutor(generator::submit));
    }
}

と:

public class Generator {
    Future<?> f;
    ExecutorService es = Executors.newSingleThreadExecutor();
    public void submit(Runnable command) {
      f = es.submit(command);
    }
    public void cancel() {
        f.cancel(true);
    }
    public void shutdown(SignalType st) {
        es.shutdown();
    }
    public LocalDateTime expensiveGeneration() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        return LocalDateTime.now();
    }
}
于 2020-03-12T10:01:00.617 に答える