2

+-42Mioレコードのデータベースで実行する多くのタスクを起動したいと思います。これを5000レコード/時間のバッチで実行したいと思います(結果として850タスクになります)。また、スレッドの数を(16に)制限したいのですが、Javaがこれを開始し、現在のコードを使用してこのタスクを実行しています。

 ExecutorService executorService = Executors.newFixedThreadPool(16);
 for (int j = 1; j < 900 + 1; j++) {
     int start = (j - 1) * 5000;
     int stop = (j) * 5000- 1;
     FetcherRunner runner = new FetcherRunner(routes, start, stop);
     executorService.submit(runner);

     Thread t = new Thread(runner);
     threadsList.add(t);
     t.start();
 }

これはこれを行う正しい方法ですか?特に、Javaはすべてのタスクを実行するだけだという印象を持っているので...(FetcherRunner実装runnable

4

5 に答える 5

4

これは正しい働き方ですか?

最初の部分は正しいです。ただし、新しいThreadオブジェクトを作成して開始するべきではありません。Runnableを送信すると、ExecutorServiceはそれをキューに入れ、ワーカースレッドが使用可能になったときに実行します。

....スレッドリストを使用して、すべてのスレッドが終了したことを検出し、結果の処理を続行できるようにします。

現在行っていることを実行すると、各タスクが2回実行されます。さらに悪いことに、手動で作成されたスレッドの群れはすべて並行して実行しようとします。

すべてのタスクが完了したことを確認する簡単な方法はawaitTermination(...)、ExecutorServiceを呼び出すことです。(エグゼキュータサービスを正常にシャットダウンしても、同じ効果があります...再度使用する予定がない場合。)

もう1つのアプローチは、Futurefor eachの結果を作成し、すべてのタスクが送信された後に結果FetcherRunnerを試行することです。getこれには、後の結果が生成される前に初期の結果の処理を開始できるという利点があります。(ただし、必要がない場合、またはできない場合は、Futuresを使用しても何も達成されません。)

于 2012-07-18T01:54:52.280 に答える
4

ExecutorServiceを使用する最初の部分は良さそうです:

...
FetcherRunner runner = new FetcherRunner(routes, start, stop);
executorService.submit(runner);

スレッドのある部分はそこにあるべきではありません、私はあなたが以前にそれをどのように持っていたかを示すためだけにそこにあると思いますか?

更新: はい、後のコードは必要ありません。これexecutorService.submit(runner)により、膨大な数のスレッドが生成されることになります。ループ後に送信されたすべてのタスクが完了するのを待つことが目的の場合は、Futureタスクを送信するときにへの参照を取得してFuture、次のように待機できます。

ExecutorService executorService = Executors.newFixedThreadPool(16);
List<Future<Result>> futures = ..;
 for (int j = 1; j < 900+ 1; j++) {
 int start = (j - 1) * 5000;
 int stop = (j) * 5000- 1;
 FetcherRunner runner = new FetcherRunner(routes, start, stop);
 futures.add(executorService.submit(runner));

}
for (Future<Result> future:futures){
    future.get(); //Do something with the results..
}
于 2012-07-18T01:43:26.463 に答える
3

You don't need to the part after the call to submit. The code you have that creates a Thread will result in 900 threads being created! Yowza. The ExecutorService has a pool of 16 threads and you can run 16 jobs at once. Any jobs submitted when all 16 threads are busy will be queued. From the docs:

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

So there is no need for yet another thread. If you need to be notified after a task has finished you can have it call out. Other options are to cache all of the Future's returned from submit, and upon each task being finished you can check to see if all Future's are done. After all Future's are finished you can dispatch another function to run. But it will run ON one of the threads in the ExecutorService.

于 2012-07-18T02:01:44.090 に答える
0

コードから変更:

    ExecutorService executorService = Executors.newFixedThreadPool(16);
    for (int j = 1; j < 900 + 1; j++) {
        int start = (j - 1) * 5000;
        int stop = (j) * 5000 - 1;
        FetcherRunner runner = new FetcherRunner(routes, start, stop);
        executorService.submit(runner);

    }
于 2012-07-18T01:56:58.463 に答える
0

最良の方法は、次のようにcountdownlatchを使用することです。

    ExecutorService executorService = Executors.newFixedThreadPool(16);
  CountdownLatch latch = new CountdownLatch(900);
 FetcherRunner runner = new FetcherRunner(routes, start, stop, latch);
 latch.await();

FetcherRunnerのfinallyブロック使用latch.countDown();コードawait()は、すべてのタスクが完了したときにのみ実行されます。

于 2015-01-17T07:24:14.230 に答える