50

私が理解している限りForkJoinPool、そのプールは固定数のスレッド(デフォルト:コアの数)を作成し、それ以上のスレッドを作成することはありません(アプリケーションがを使用してスレッドの必要性を示さない限りmanagedBlock)。

しかし、使用するForkJoinPool.getPoolSize()と、30,000個のタスク(RecursiveAction)を作成するプログラムで、ForkJoinPoolそれらのタスクを実行すると、平均で700個のスレッドが使用されることがわかりました(タスクが作成されるたびにスレッドがカウントされます)。タスクはI/Oを実行しませんが、純粋な計算を実行します。タスク間の同期は、の呼び出しForkJoinTask.join()とアクセスのみAtomicBooleanです。つまり、スレッドブロッキング操作はありません。

私が理解しているように、呼び出し元のスレッドをブロックしないのでjoin()、プール内のスレッドがブロックされる理由はありません。したがって、(私が想定していた)それ以上のスレッドを作成する理由はないはずです(それでも明らかに発生しています) 。

では、なぜForkJoinPoolこれほど多くのスレッドを作成するのでしょうか。作成されるスレッドの数を決定する要因は何ですか?

コードを投稿せずにこの質問に答えられることを望んでいましたが、ここではリクエストに応じて提供されます。このコードは、4倍のサイズのプログラムからの抜粋であり、重要な部分に縮小されています。そのままではコンパイルされません。もちろん、必要に応じて、プログラム全体を投稿することもできます。

プログラムは、深さ優先探索を使用して、特定の始点から特定の終点までのパスを迷路で検索します。解決策が存在することが保証されています。主なロジックは次のcompute()方法にありSolverTaskます:ARecursiveActionある特定のポイントで開始し、現在のポイントから到達可能なすべての隣接ポイントで継続します。新しいものを作成するのではなくSolverTask各分岐点(非常に多くのタスクを作成する)で、1つを除くすべてのネイバーをバックトラッキングスタックにプッシュして後で処理し、スタックにプッシュされていない1つのネイバーのみを続行します。そのように行き止まりに達すると、バックトラッキングスタックに最後にプッシュされたポイントがポップされ、そこから検索が続行されます(それに応じて、タックの開始点から構築されたパスが削減されます)。タスクが特定のしきい値よりも大きいバックトラッキングスタックを検出すると、新しいタスクが作成されます。その時から、タスクはバックトラックスタックからポップし続け、それが使い果たされるまで、分岐ポイントに到達したときにスタックにそれ以上のポイントをプッシュしませんが、そのようなポイントごとに新しいタスクを作成します。したがって、タスクのサイズは、スタック制限しきい値を使用して調整できます。

上で引用した数値(「30,000タスク、平均700スレッド」)は、5000x5000セルの迷路を検索したものです。したがって、ここに重要なコードがあります:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
4

5 に答える 5

17

関連する質問がstackoverflowにあります:

InvokeAll/join 中に ForkJoinPool が停止する

ForkJoinPool がスレッドを浪費しているようです

何が起こっているかの実行可能なストリップダウンバージョンを作成しました(使用したjvm引数:-Xms256m -Xmx1024m -Xss8m):

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class Test1 {

    private static ForkJoinPool pool = new ForkJoinPool(2);

    private static class SomeAction extends RecursiveAction {

        private int counter;         //recursive counter
        private int childrenCount=80;//amount of children to spawn
        private int idx;             // just for displaying

        private SomeAction(int counter, int idx) {
            this.counter = counter;
            this.idx = idx;
        }

        @Override
        protected void compute() {

            System.out.println(
                "counter=" + counter + "." + idx +
                " activeThreads=" + pool.getActiveThreadCount() +
                " runningThreads=" + pool.getRunningThreadCount() +
                " poolSize=" + pool.getPoolSize() +
                " queuedTasks=" + pool.getQueuedTaskCount() +
                " queuedSubmissions=" + pool.getQueuedSubmissionCount() +
                " parallelism=" + pool.getParallelism() +
                " stealCount=" + pool.getStealCount());
            if (counter <= 0) return;

            List<SomeAction> list = new ArrayList<>(childrenCount);
            for (int i=0;i<childrenCount;i++){
                SomeAction next = new SomeAction(counter-1,i);
                list.add(next);
                next.fork();
            }


            for (SomeAction action:list){
                action.join();
            }
        }
    }

    public static void main(String[] args) throws Exception{
        pool.invoke(new SomeAction(2,0));
    }
}

どうやら、結合を実行すると、現在のスレッドは必要なタスクがまだ完了していないことを認識し、別のタスクを実行する必要があります。

で起こりjava.util.concurrent.ForkJoinWorkerThread#joinTaskます。

ただし、この新しいタスクは同じタスクをさらに生成しますが、スレッドが結合でロックされているため、プール内のスレッドを見つけることができません。そして、それらが解放されるのにどれくらいの時間が必要かを知る方法がないため (スレッドが無限ループにあるか、永遠にデッドロック状態になる可能性があります)、新しいスレッドが生成されます ( Louis Wassermanが述べたように、結合されたスレッドを補正します) ):java.util.concurrent.ForkJoinPool#signalWork

したがって、このようなシナリオを防ぐには、タスクの再帰的な生成を避ける必要があります。

たとえば、上記のコードで初期パラメーターを 1 に設定した場合、childrenCount を 10 倍に増やしても、アクティブなスレッドの量は 2 になります。

また、アクティブなスレッドの量が増加する一方で、実行中のスレッドの量はparallelismと同じかそれ以下であることに注意してください。

于 2013-11-21T17:54:30.700 に答える
12

ソースコメントから:

補正: 十分な数のライブ スレッドが存在しない限り、メソッド tryPreBlock() は予備のスレッドを作成または再アクティブ化して、ブロックされたジョイナーがブロック解除されるまで補正します。

何が起こっているのかというと、どのタスクも非常に迅速に終了していないためだと思います。また、新しいタスクを送信したときに使用可能なワーカー スレッドがないため、新しいスレッドが作成されます。

于 2012-05-29T10:59:31.737 に答える
8

strict、full-strict、および terminally-strict は、有向非巡回グラフ (DAG) の処理に関係しています。これらの用語をグーグルで検索して、それらを完全に理解することができます. これは、フレームワークが処理するように設計された処理のタイプです。Recursive... の API のコードを見てください。フレームワークは、compute() コードに依存して他の compute() リンクを実行し、次に join() を実行します。各タスクは、DAG の処理と同様に単一の join() を実行します。

DAG 処理を行っていません。多くの新しいタスクをフォークし、それぞれを待機 (join()) しています。ソースコードを読んでください。それは恐ろしく複雑ですが、あなたはそれを理解することができるかもしれません. フレームワークは適切なタスク管理を行いません。join() を実行するときに、待機中のタスクをどこに配置しますか? 何が終了したかを確認するためにキューを常に監視する必要がある、中断されたキューはありません。これが、フレームワークが「継続スレッド」を使用する理由です。1 つのタスクが join() を実行すると、フレームワークは下位の 1 つのタスクが終了するのを待っていると想定します。多くの join() メソッドが存在する場合、スレッドは続行できないため、ヘルパーまたは継続スレッドが存在する必要があります。

前述のように、スキャッター ギャザー タイプの fork-join プロセスが必要です。そこで、多くのタスクをフォークできます

于 2012-06-01T14:09:47.150 に答える