428

次のように、一度に4つのタスクを実行する必要があります。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

それらがすべて完了したら、どうすれば通知を受け取ることができますか? 今のところ、いくつかのグローバル タスク カウンターを設定し、すべてのタスクの最後にそれを減らし、無限ループでこのカウンターが 0 になるのを監視する以外に良いことは考えられません。または、Future のリストを取得し、それらすべてについて無限ループ モニター isDone を取得します。無限ループを含まないより良い解決策は何ですか?

ありがとう。

4

27 に答える 27

494

基本的にExecutorServiceあなたが電話shutdown()してからawaitTermination()

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}
于 2009-08-09T04:44:49.593 に答える
192

CountDownLatchを使用します。

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

そしてあなたのタスク内で(try / finallyで囲みます)

latch.countDown();
于 2009-08-09T04:46:20.573 に答える
93

ExecutorService.invokeAll()あなたのためにそれを行います。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);
于 2009-08-09T04:52:19.127 に答える
50

先物のリストも使用できます。

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

次に、それらすべてに参加したい場合、本質的にはそれぞれに参加することと同等です(子スレッドからメインに例外を再発生させるという追加の利点があります):

for(Future f: this.futures) { f.get(); }

基本的には、(すべてまたはそれぞれ) で isDone() を呼び出す無限ループの代わりに、各 Future で .get() を一度に 1 つずつ呼び出すのが秘訣です。したがって、最後のスレッドが終了するとすぐに、このブロックを通過して通過することが保証されます。警告は、.get() 呼び出しが例外を再発生させるため、スレッドの 1 つが停止した場合、おそらく他のスレッドが完了する前にこれから例外を発生させることです [これを回避するためcatch ExecutionExceptionに、get 呼び出しの周りにa を追加できます]。もう1つの注意点は、すべてのスレッドへの参照を保持するため、スレッドローカル変数がある場合、このブロックを通過するまで収集されないことです(問題が発生した場合は、削除することで回避できる場合があります) Future は ArrayList から外れています)。どの Future が「最初に終了する」か知りたい場合https://stackoverflow.com/a/31885029/32453

于 2012-11-02T16:07:38.917 に答える
47

Java8 では、 CompletableFutureでそれを行うことができます:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();
于 2016-04-29T10:45:01.513 に答える
28

ちょうど私の2セント。事前にタスクの数を知るという要件を克服するためにCountDownLatch、単純な を使用して昔ながらの方法でそれを行うことができますSemaphore

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

あなたのタスクではs.release()、あなたがするように呼び出すだけですlatch.countDown();

于 2012-01-19T10:48:28.830 に答える
12

Java 5以降のCyclicBarrierクラスは、この種のもののために設計されています。

于 2009-08-09T07:54:55.483 に答える
5

通知を送信する別のランナブルでタスクをラップできます。

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});
于 2009-08-09T04:46:45.753 に答える
3

あなたの問題を解決するサンプルプログラムを書きました。簡潔な実装が与えられていなかったので、追加します。executor.shutdown()とを使用できますがexecutor.awaitTermination()、異なるスレッドにかかる時間は予測できないため、ベスト プラクティスではありません。

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
于 2012-12-30T19:15:36.497 に答える
3

ExecutorService によるクリーンな方法

 List<Future<Void>> results = null;
 try {
     List<Callable<Void>> tasks = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(4);
     results = executorService.invokeAll(tasks);
 } catch (InterruptedException ex) {
     ...
 } catch (Exception ex) {
     ...
 }
于 2020-09-09T21:00:01.843 に答える
3

ここで、ラッチ/バリアを使用するのとは異なる、より多くの代替手段を提供するだけです。CompletionServiceを使用して、すべてが完了するまで部分的な結果を取得することもできます。

実際の Java Concurrency から: 「Executor に送信する計算のバッチがあり、それらの結果が利用可能になったときにそれらの結果を取得したい場合は、各タスクに関連付けられた Future を保持し、 get を呼び出すことで完了を繰り返しポーリングできます。ゼロのタイムアウト。これは可能ですが、面倒です。幸いなことに、完了サービスというより良い方法があります。"

ここで実装

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}
于 2016-03-04T17:51:40.230 に答える
2

次のコードを使用できます。

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");
于 2017-05-23T06:15:40.980 に答える
1

メソッドを使用する必要がexecutorService.shutdown()ありexecutorService.awaitTerminationます。

例は次のとおりです。

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}
于 2016-06-21T05:41:18.030 に答える
1

ExecutorCompletionServiceの独自のサブクラスを使用して wrapし、 BlockingQueuetaskExecutorの独自の実装を使用して、各タスクが完了したときに通知を受け、完了したタスクの数が目的の目標に達したときに必要なコールバックまたはその他のアクションを実行できます。

于 2009-08-09T04:49:00.383 に答える
0

Java 8 - ストリーム API を使用してストリームを処理できます。以下のスニペットを参照してください

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();
于 2016-07-19T23:32:06.777 に答える
-1

これは役立つかもしれません

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }
于 2015-10-22T05:57:56.277 に答える
-1

このRunnerクラスでwaitTillDone()を呼び出すことができます。

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

このクラスを再利用して、shutdown() を呼び出す前に何度でも waitTillDone() を呼び出すことができます。また、コードは非常に単純です。また、事前にタスクの数を知る必要はありません

それを使用するには、この gradle/mavencompile 'com.github.matejtymes:javafixes:1.3.1'依存関係をプロジェクトに追加するだけです。

詳細については、次を参照してください。

https://github.com/MatejTymes/JavaFixes

于 2016-04-29T01:29:21.863 に答える
-2

getActiveCount()エグゼキューターには、アクティブなスレッドの数を示すメソッドがあります。

スレッドをスパンした後、activeCount()値が であるかどうかを確認できます0。値がゼロになると、現在実行中のアクティブなスレッドがないことを意味し、タスクが終了したことを意味します。

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}
于 2014-10-31T09:28:49.267 に答える