1

ForkJoinPool の awaitQuiescence メソッドを使用して、送信されたすべてのタスクが完了するまで待機するか、タイムアウト後にタスクがまだ完了していない場合は false を返そうとしています。

事実上、サブミットされたすべてのタスクが追加のタスクをプールに追加できるため、awaitTermination メソッドは使用できません。これらの追加のタスクがサブミットされないようにするためです。ただし、awaitQuiescence は、指定された時間が経過しても何も返しません。

以下のコードで問題を具体化しようとしました。CountDownLatch.await は決してトリガーされませんが、なぜ awaitQuiescence メソッドは false を返さないのでしょうか?

public static void main(String[] args) {
    final ForkJoinPool test = new ForkJoinPool(1,
            ForkJoinPool.defaultForkJoinWorkerThreadFactory, null,true);
    final CountDownLatch latch = new CountDownLatch(1);

    test.execute(() -> { 
            try {
                System.out.println("Sleeping");
                Future<Double> f = test.submit(() -> { 
                        latch.await(); 
                        return 0d; 
                    });
                System.out.println(f.get());
                System.out.println("Waking up");
    } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            });

    System.out.println(test.awaitQuiescence(1, TimeUnit.SECONDS));
} 

どうもありがとう!

4

1 に答える 1

1

awaitQuiescence メソッドが false を返さないのはなぜですか?

保留中のタスクがある間はawaitQuiescence無視timeoutし、呼び出し元のスレッドでタスクを実行するようです (ソース コードを参照)。

スレッド ダンプ:

"ForkJoinPool-1-worker-1" [...] Object.wait() [...]
   java.lang.Thread.State: WAITING (on object monitor)
  [...]
  at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:995)
  [...]
  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

[...]

"main" [...] waiting on condition [...]
   java.lang.Thread.State: WAITING (parking)
  [...]
  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
  [...]
  at java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1445)
  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
  at java.util.concurrent.ForkJoinPool.awaitQuiescence(ForkJoinPool.java:3097)
  [...]

「メイン」スレッドは 2 番目のタスクを実行し、ラッチを待機するため、awaitQuiescence決して終了しません。

私の意見では、これはバグです。javadocに基づいて、メソッドの最大実行時間 (「最大待機時間」) は約timeoutであると想定しますが、上限は実際には保留中のすべてのタスクとそのすべての「子孫」の実行時間に似ています。 (おそらく端末のものを除く)。

一方、FJ プールは、このタイプのタスク (プール管理されていない同期) には適していません。ForkJoinTask のjavadocから:

計算は、理想的には、同期されたメソッドまたはブロックを回避する必要があり、他のタスクへの参加や、フォーク/ジョイン スケジューリングと連携するように宣伝されている Phaser などのシンクロナイザーの使用とは別に、他のブロッキング同期を最小限に抑える必要があります。

[...]

ブロックする可能性のある ForkJoinTasks を定義して使用することは可能ですが、実行するにはさらに 3 つの考慮事項が必要です。決して結合されないイベント スタイルの非同期タスク (たとえば、CountedCompleter をサブクラス化するタスク) は、多くの場合、このカテゴリに分類されます。(2) リソースへの影響を最小限に抑えるために、タスクは小さくする必要があります。理想的には、(おそらく) ブロッキング アクションのみを実行します。(3) ForkJoinPool.ManagedBlocker API が使用されていない場合、またはブロックされる可能性のあるタスクの数がプールの ForkJoinPool.getParallelism() レベルよりも少ないことがわかっている場合を除き、プールは、進行または良好なパフォーマンスを確保するために十分なスレッドが使用可能であることを保証できません。 .

ThreadPoolExecutor使用および/またはエミュレートすることを検討してくださいawaitQuiescence(例: Phaserの使用)。可能な実装のスケッチ:

class TaskTrackingExecutorService implements ExecutorService {

  private final ExecutorService delegate;
  private final Phaser taskTracker = new Phaser();

  public TaskTrackingExecutorService(ExecutorService delegate) {
    this.delegate = delegate;
  }

  @Override
  public <T> Future<T> submit(Callable<T> task) {
    return delegate.submit(() -> {
      taskTracker.register();
      try {
        return task.call();
      } finally {
        taskTracker.arriveAndDeregister();
      }
    });
  }

  @Override
  public void execute(Runnable command) {
    submit(Executors.callable(command));
  }

  public boolean awaitQuiescence(long timeout, TimeUnit timeUnit) throws InterruptedException {
    taskTracker.register();
    try {
      taskTracker.awaitAdvanceInterruptibly(taskTracker.arriveAndDeregister(), timeout, timeUnit);
      return true;
    } catch (TimeoutException e) {
      return false;
    }
  }

  @Override
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return delegate.awaitTermination(timeout, unit);
  }

  // rest is similar: either use submit method or the delegate.

}


public class Test {
  public static void main(String[] args) throws InterruptedException {
    TaskTrackingExecutorService pool =
        new TaskTrackingExecutorService(Executors.newCachedThreadPool());
    CountDownLatch latch = new CountDownLatch(1);

    pool.execute(() -> {
          System.out.println("Sleeping");
          Future<Double> f = pool.submit(() -> {
            latch.await();
            return 0d;
          });
          try {
            System.out.println(f.get());
          } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
          }
          System.out.println("Waking up");
        }
    );

    System.out.println(pool.awaitQuiescence(2, TimeUnit.SECONDS));
  }
}
于 2016-12-10T16:53:17.430 に答える