3

複数の呼び出し可能オブジェクトを並列に実行したい。しかし、ExecutorServiceは、すべての呼び出し可能オブジェクトが終了するまで常に待機しているようです。

私は次のことを試しました:

final int nThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
List<PrimeCallable> tasks = new ArrayList<PrimeCallable>();
for(int i = 0; i < nThreads; i++) {
    tasks.add(new PrimeCallable(0, i * 100 + 100, "thread" + i));
}

try {
    for(Future<List<Integer>> result : executorService.invokeAll(tasks)) {
        List<Integer> integers = result.get();
        for(Integer i : integers){
            System.out.println(i);
        }
    }
} catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
} catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

これで、executorService内のすべての呼び出し可能オブジェクトが終了したときにforループが呼び出されます。私の知る限り、executorService.isParallelセッターはありません;-)。

呼び出し可能オブジェクトを並列実行させるための正しいアプローチは何でしょうか?

ヒントをありがとう!

4

4 に答える 4

10

invokeAllのjavadocsは次のように述べています。

指定されたタスクを実行し、すべてが完了すると、ステータスと結果を保持する先物のリストを返します。Future.isDone()は、返されたリストの各要素に対してtrueです。

したがってinvokeAll、コレクション内の各タスクが完了するまでブロックします。

于 2011-06-14T15:28:02.930 に答える
6

エグゼキュータサービスは、すべての呼び出し可能オブジェクトを並行して実行します。実行するのは、すべての並列タスクが完了するのを待ってから次に進むことです。したがって、すべてのタスクが連続して実行される場所とは異なります。

于 2011-06-14T14:44:43.370 に答える
3

あなたが望むものの一部は怠惰な実行であるように聞こえます-結果を抽出する前にメモリ内の構造のコピーを作成する必要はありません。

私はこれを反復+変換の問題として扱います。まず、入力に対してイテレータを定義し、next()を呼び出すたびに、系列の次の値を生成するCallableが返されるようにします。

変換段階では、これらのCallableの並列または同時評価を適用します。これは、次のようなものです(テストされていません)。

public class ConcurrentTransform
{
  private final ExecutorService executor;
  private final int maxBuffer;

  public ConcurrentTransform(ExecutorService executor, int maxWorkBuffer) {
    this.executor = executor;
    this.maxBuffer = Math.max(1, maxWorkBuffer);
  }

  public <T> Iterator<T> apply(final Iterator<Callable<T>> input) {
    // track submitted work
    final BlockingQueue<Future<T>> submitted = new LinkedBlockingQueue<Future<T>>();

    // submit first N tasks
    for (int i=0; i<maxBuffer && input.hasNext(); i++) {
      Callable<T> task = input.next();
      Future<T> future = executor.submit(task);
      submitted.add(future);
    }

    return new Iterator<T>(){
      @Override
      public synchronized boolean hasNext() {
        return !submitted.isEmpty();
      }
      @Override
      public T next() {
        Future<T> result;
        synchronized (this) {
          result = submitted.poll();
          if (input.hasNext()) {
            submitted.add(executor.submit(input.next()));
          }
        }

        if (result != null) {
          try {
            return result.get(); // blocking
          } catch (Exception e) {
            if (e instanceof RuntimeException) {
               throw (RuntimeException) e;
            } else {
               throw new RuntimeException(e);
            }
          }
        } else {
          throw new NoSuchElementException();
        }
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }};
  }
}

apply(...)を呼び出した後、結果の値を反復処理します。これは、内部でCallableオブジェクトを並列に実行し、入力されたのと同じ順序で結果を返します。いくつかの改良点は、ブロッキングresult.get()呼び出しのオプションのタイムアウトを許可すること、またはトランスフォーム自体の中でスレッドプールを管理することです。

于 2011-09-08T04:56:03.103 に答える
2

結果を発生時に表示する場合は、を使用しExecutorCompletionServiceます。

于 2011-06-14T15:49:50.683 に答える