まず、2 つの静的関数を使用してコードを書き直して、何が起こっているかを簡単に確認できるようにします。
// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.err.println("Executor beginning task on thread: "
+ t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.err.println("Executor finishing task on thread: "
+ Thread.currentThread().getName());
}
};
}
と
// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
return b -> {
System.err.println(Thread.currentThread().getName()
+ ": About to enqueue task");
CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
executor.execute(() -> {
System.err.println(Thread.currentThread().getName()
+ ": Inner task");
innerFuture.complete(true);
});
System.err.println(Thread.currentThread().getName()
+ ": Task enqueued");
return innerFuture;
};
}
これで、テスト ケースを次のように記述できます。
ExecutorService e = loggingExecutor(1);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
2 番目の Future の結果が計算されるまで、最初のスレッドは解放されないという結論をテストしてみましょう。
ExecutorService e = loggingExecutor(2); // use 2 threads this time
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/
実際、スレッド 1 はスレッド 2 が完了するまで保持されているようです。
thenComposeAsync
それ自体がブロックすることが正しいかどうかを見てみましょう。
ExecutorService e = loggingExecutor(1);
CompletableFuture<Boolean> future =
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e);
System.err.println("thenComposeAsync returned");
future.join();
e.shutdown();
/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
thenComposeAsync
ブロックしませんでした。CompletableFuture
すぐに返され、完了しようとしたときにのみデッドロックが発生しました。では、 によって返される未来を完成させるには何が必要.thenComposeAsync(inner(e), e)
でしょうか?
- API は、inner(e) が戻るのを待つ必要があります
CompletableFuture<Boolean>
CompletableFuture<Boolean>
返されたものも完了するのを待つ必要があります。そうして初めて、未来は完成します。したがって、ご覧のとおり、提案したことを実行して不完全な Future を返すことはできません。
バグですか?内部タスクの計算中に CompletionStage がスレッド 1 を保持するのはなぜですか? ご指摘のとおり、ドキュメントはかなり曖昧で、特定の順序でスレッドをリリースすることを約束していないため、これはバグではありません。また、CompletableFuture の後続のthen*()
メソッドには Thread1 が使用されることに注意してください。次の点を考慮してください。
ExecutorService e = loggingExecutor(2);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
pool-1-thread-1: All done
Executor finishing task on thread: pool-1-thread-1
*/
ご覧のとおり、スレッド 1 で .thenRun(...) が実行されました。これは、CompletableFuture の他の *Async(... , Executor exec) メソッドと一致していると思います。
thenComposeAsync
しかし、スレッドのジャグリングを API に任せるのではなく、 の機能を 2 つの個別に制御可能なステップに分割したい場合はどうでしょうか? これを行うことができます:
ExecutorService e = loggingExecutor(1);
completedFuture(true)
.thenApplyAsync(inner(e), e) // do the async part first
.thenCompose(x -> x) // compose separately
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
すべてがデッドロックなしで 1 つのスレッドでうまく実行されます。
結論として、あなたが言うように、この動作は直感的ではありませんか? 知らない。なぜthenComposeAsync
存在するのか想像できません。メソッドが を返す場合CompletableFuture
、それはブロックされるべきではなく、非同期で呼び出す理由があってはなりません。