2

擬似コードで次の関数があります。

Result calc(Data data) {
  if (data.isFinal()) {
    return new Result(data); // This is the actual lengthy calculation
  } else {
    List<Result> results = new ArrayList<Result>();
    for (int i=0; i<data.numOfSubTasks(); ++i) {
      results.add(calc(data.subTask(i));
    }
    return new Result(results); // merge all results in to a single result
  }
}

固定数のスレッドを使用して、並列化したいと思います。

私の最初の試みは:

ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);

Result calc(Data data) {
  if (data.isFinal()) {
    return new Result(data); // This is the actual lengthy calculation
  } else {
    List<Result> results = new ArrayList<Result>();
    List<Callable<Void>> callables = new ArrayList<Callable<Void>>();
    for (int i=0; i<data.numOfSubTasks(); ++i) {
      callables.add(new Callable<Void>() {
        public Void call() {
         results.add(calc(data.subTask(i));
        }
      });
    }
    executorService.invokeAll(callables);  // wait for all sub-tasks to complete
    return new Result(results); // merge all results in to a single result
  }
}

ただし、これはすぐにデッドロックに陥りました。これは、最上位の再帰レベルがすべてのスレッドの終了を待機している間、内部レベルもスレッドが使用可能になるのを待機しているためです...

デッドロックなしでプログラムを効率的に並列化するにはどうすればよいですか?

4

2 に答える 2

5

問題は、依存関係のあるタスクにThreadPoolExecutorを使用する場合の一般的な設計上の問題です。

2つのオプションがあります。

1)まだ開始されていないタスクに依存する実行中のタスクが発生しないように、必ずボトムアップの順序でタスクを送信してください。

2)「直接ハンドオフ」戦略を使用します(ThreadPoolExecutorのドキュメントを参照)。

ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
executor.setRejectedExecutionHandler(new CallerRunsPolicy());

アイデアは、タスクが実際のキューで待機しないように同期キューを使用することです。拒否ハンドラーは、実行可能なスレッドがないタスクを処理します。この特定のハンドラーを使用すると、サブミッタースレッドは拒否されたタスクを実行します。

このエグゼキュータ構成は、タスクが拒否されないこと、およびタスク間の依存関係が原因でデッドロックが発生しないことを保証します。

于 2013-02-26T08:51:35.957 に答える
0

アプローチを 2 つのフェーズに分割する必要があります。

  1. data.isFinal() == true まですべてのツリーを作成します
  2. 結果を再帰的に収集します (マージによって他の操作/呼び出しが生成されない場合にのみ可能)

そのために、 を使用[Futures][1]して結果を非同期にすることができます。calc のすべての結果が Future[Result] 型になることを意味します。

Future をすぐに返すと、現在のスレッドが解放され、他のスレッドを処理するためのスペースが確保されます。結果のコレクション (new Result(results)) では、すべての結果が準備できるまで待機する必要があります (ScatterGather-Pattern、セマフォを使用してすべての結果を待機できます)。コレクション自体はツリーをたどり、チェック (または結果が到着するのを待つ) は単一のスレッドで行われます。

全体として、結果を収集し、スレッドプールで「高価な」操作のみを実行するために使用される Future のツリーを構築します。

于 2013-02-26T08:42:03.073 に答える