5

Javaでタスクのバッチを実行する方法を探しています。アイデアはExecutorService、スレッドプールに基づいて、スレッドCallableのさまざまなスレッド間で一連のセットを分散できるようにすることですmain。このクラスは、すべてのタスクが実行されるまでスレッドをスリープ状態にするwaitForCompletionメソッドを提供する必要があります。main次に、mainスレッドを起動する必要があります。スレッドはいくつかの操作を実行し、一連のタスクを再送信します。

このプロセスは何度も繰り返されるので、ExecutorService.shutdownの複数のインスタンスを作成する必要があるので、使用したいと思いますExecutorService

AtomicInteger現在、、、およびLock/を使用して次のように実装していますCondition

public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
  private final AtomicInteger mActiveCount;
  private final Lock          mLock;
  private final Condition     mCondition;

  public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
    ...
    for(C task : batch){
      submit(task);
      mActiveCount.incrementAndGet();
    }
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    mLock.lock();
    if (mActiveCount.decrementAndGet() == 0) {
      mCondition.signalAll();
    }
    mLock.unlock();
  }

  public void awaitBatchCompletion() throws InterruptedException {
    ...
    // Lock and wait until there is no active task
    mLock.lock();
    while (mActiveCount.get() > 0) {
      try {
        mCondition.await();
      } catch (InterruptedException e) {
        mLock.unlock();
        throw e;
      }
    }
    mLock.unlock();
  } 
}

バッチからすべてのタスクを一度に送信する必要はないためCountDownLatch、オプションではないようです。

これはそれを行うための有効な方法ですか?それを実装するためのより効率的でエレガントな方法はありますか?

ありがとう

4

3 に答える 3

8

ExecutorService自体が要件を実行できると思います。

すべてのタスクを呼び出しinvokeAll([...])て繰り返します。すべての Future を繰り返すことができれば、すべての Task が終了します。

于 2012-04-24T12:59:22.497 に答える
3

他の回答が指摘しているように、カスタム ExecutorService を必要とするユース ケースはないようです。

あなたがする必要があるのは、バッチを送信し、メインスレッドの割り込みを無視しながらすべてが終了するのを待ってから、おそらく最初のバッチの結果に基づいて別のバッチを送信することだけです。これは次の問題だと思います。

    ExecutorService service = ...;

    Collection<Future> futures = new HashSet<Future>();
    for (Callable callable : tasks) {
        Future future = service.submit(callable);
        futures.add(future);
    }

    for(Future future : futures) {
        try {
            future.get();
        } catch (InterruptedException e) {
            // Figure out if the interruption means we should stop.
        }
    }

    // Use the results of futures to figure out a new batch of tasks.
    // Repeat the process with the same ExecutorService.
于 2012-04-24T13:32:49.933 に答える
0

ExecutorsデフォルトのJavaは、ジョブの「バッチ」を実行するために必要なすべての機能を提供する必要があるという@cquetbachに同意します。

私があなただったら、たくさんのジョブを送信し、それらが で終了するのを待ってからExecutorService.awaitTermination()、新しい を開始しExecutorServiceます。「スレッドの作成」を節約するためにこれを行うことは、これを1秒間に数百回行う場合を除き、時期尚早の最適化です。

ExecutorServiceバッチごとに同じものを使用することに本当にこだわっている場合は、ThreadPoolExecutor自分自身を割り当てて、 を見てループに入ることができますThreadPoolExecutor.getActiveCount()。何かのようなもの:

BlockingQueue jobQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
    0L, TimeUnit.MILLISECONDS, jobQueue);
// submit your batch of jobs ...
// need to wait a bit for the jobs to start
Thread.sleep(100);
while (executor.getActiveCount() > 0 && jobQueue.size() > 0) {
    // to slow the spin
    Thread.sleep(1000);
}
// continue on to submit the next batch
于 2012-04-24T13:10:08.833 に答える