awaitQuiescence メソッドが false を返さないのはなぜですか?
保留中のタスクがある間はawaitQuiescence
無視timeout
し、呼び出し元のスレッドでタスクを実行するようです (ソース コードを参照)。
スレッド ダンプ:
"ForkJoinPool-1-worker-1" [...] Object.wait() [...]
java.lang.Thread.State: WAITING (on object monitor)
[...]
at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:995)
[...]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
[...]
"main" [...] waiting on condition [...]
java.lang.Thread.State: WAITING (parking)
[...]
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
[...]
at java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1445)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool.awaitQuiescence(ForkJoinPool.java:3097)
[...]
「メイン」スレッドは 2 番目のタスクを実行し、ラッチを待機するため、awaitQuiescence
決して終了しません。
私の意見では、これはバグです。javadocに基づいて、メソッドの最大実行時間 (「最大待機時間」) は約timeout
であると想定しますが、上限は実際には保留中のすべてのタスクとそのすべての「子孫」の実行時間に似ています。 (おそらく端末のものを除く)。
一方、FJ プールは、このタイプのタスク (プール管理されていない同期) には適していません。ForkJoinTask のjavadocから:
計算は、理想的には、同期されたメソッドまたはブロックを回避する必要があり、他のタスクへの参加や、フォーク/ジョイン スケジューリングと連携するように宣伝されている Phaser などのシンクロナイザーの使用とは別に、他のブロッキング同期を最小限に抑える必要があります。
[...]
ブロックする可能性のある ForkJoinTasks を定義して使用することは可能ですが、実行するにはさらに 3 つの考慮事項が必要です。決して結合されないイベント スタイルの非同期タスク (たとえば、CountedCompleter をサブクラス化するタスク) は、多くの場合、このカテゴリに分類されます。(2) リソースへの影響を最小限に抑えるために、タスクは小さくする必要があります。理想的には、(おそらく) ブロッキング アクションのみを実行します。(3) ForkJoinPool.ManagedBlocker API が使用されていない場合、またはブロックされる可能性のあるタスクの数がプールの ForkJoinPool.getParallelism() レベルよりも少ないことがわかっている場合を除き、プールは、進行または良好なパフォーマンスを確保するために十分なスレッドが使用可能であることを保証できません。 .
ThreadPoolExecutor
使用および/またはエミュレートすることを検討してくださいawaitQuiescence
(例: Phaserの使用)。可能な実装のスケッチ:
class TaskTrackingExecutorService implements ExecutorService {
private final ExecutorService delegate;
private final Phaser taskTracker = new Phaser();
public TaskTrackingExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(() -> {
taskTracker.register();
try {
return task.call();
} finally {
taskTracker.arriveAndDeregister();
}
});
}
@Override
public void execute(Runnable command) {
submit(Executors.callable(command));
}
public boolean awaitQuiescence(long timeout, TimeUnit timeUnit) throws InterruptedException {
taskTracker.register();
try {
taskTracker.awaitAdvanceInterruptibly(taskTracker.arriveAndDeregister(), timeout, timeUnit);
return true;
} catch (TimeoutException e) {
return false;
}
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}
// rest is similar: either use submit method or the delegate.
}
public class Test {
public static void main(String[] args) throws InterruptedException {
TaskTrackingExecutorService pool =
new TaskTrackingExecutorService(Executors.newCachedThreadPool());
CountDownLatch latch = new CountDownLatch(1);
pool.execute(() -> {
System.out.println("Sleeping");
Future<Double> f = pool.submit(() -> {
latch.await();
return 0d;
});
try {
System.out.println(f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("Waking up");
}
);
System.out.println(pool.awaitQuiescence(2, TimeUnit.SECONDS));
}
}