-1

Java でForkJoinPoolを使用して画像を処理しようとしています。ストリームを使用して、画像に対していくつかのカスタム操作を行いました。メソッドにForkJoinPoolを使用しようとしていgetRGBますsetRGBgetRGBメソッドで並列処理を実現するにはどうすればよいですか?

    @Override
    public int[] getRGB(int xStart, int yStart, int w, int h, int[] rgbArray,int offset, int scansize) {

        int[][] sol = new int[h][w];

        int threshold = w;

        class RecursiveSetter extends RecursiveAction {
            int from;
            int to;
            FJBufferedImage image;

            RecursiveSetter(int from, int to, FJBufferedImage image) {
                this.from = from;
                this.to = to;
                this.image = image;
            }

            @Override
            protected void compute() {
                System.out.println("From : " + from + " To : " + to);
                if (from >= to) return;

                if (to - from == 1) {
                    computeDirectly(from);
                    return;
                } else {
                    int mid = from + (to - from) / 2;
                    System.out.println("From : " + from + " To : " + to +
                            "Mid :" + mid);
                    invokeAll(
                            new RecursiveSetter(from, mid, image),
                            new RecursiveSetter(mid + 1, to, image));
                    return;
                }
            }

            void computeDirectly(int row) {
                sol[from] = image.getRealRGB(from, 0, w, 1, null, offset,
                        scansize);

            }
        }

        ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        pool.invoke(new RecursiveSetter(0, h-1, this));
        return Arrays.stream(sol)
                .flatMapToInt(Arrays::stream)
                .toArray();
    }

getRealRGBメソッドを単にプロキシしますBufferedImage。これが非現実的かもしれないことは理解していますが、このコンテキストでForkJoinPoolを使用する方法を知りたいだけです。そして、ええ、上記のコードはArrayIndexOutOfBound例外をスローしています。作業負荷を分割する方法 (行 vs 列 vs 小さなグリッド。現在、行ごとに分割しています) としきい値の決定方法について提案してください。

4

1 に答える 1

3

あなたの試みに関する最初のいくつかの発言:

int[][] sol = new int[h][w];

ここでは、Java では 1 次元配列である 2 次元配列を作成しています。これint[]は、型のサブ配列が既に取り込まれている要素型int[]です。要素を で上書きするのでsol[from] = /* something returning an int[] array */、これらのサブ配列の割り当ては廃止されました。したがって、この場合、使用する必要があります

int[][] sol = new int[h][];

代わりは。しかし、外側の配列の 1 次元の性質を認識することで、単純なストリーミング ソリューションで機能することも認識できます。つまり、

int[][] sol = IntStream.range(yStart, yStart+h)
    .parallel()
    .mapToObj(y -> getRealRGB(xStart, y, w, 1, null, 0, scansize))
    .toArray(int[][]::new);

これにより、使用可能なコアにワークロードが分散されます。あなたがしようとしたのと同じように、舞台裏でフォーク/ジョインフレームワークを使用しますが、それは実装の詳細です。それを次のストリーム操作と融合できます。

return IntStream.range(yStart, yStart+h)
    .parallel()
    .flatMap(y -> Arrays.stream(getRealRGB(xStart, y, w, 1, null, 0, scansize)))
    .toArray();

ただし、メソッドのシグネチャを正しく理解していれば、実際にやりたいことがあります

public int[] getRGB(
       int xStart, int yStart, int w, int h, int[] rgbArray, int offset, int scansize) {

    final int[] result = rgbArray!=null? rgbArray: new int[offset+h*scansize];
    IntStream.range(yStart, yStart+h).parallel()
        .forEach(y -> getRealRGB(xStart, y, w, 1, result, offset+y*scansize, scansize));
    return result;
}

契約を履行すること。これにより、コピー操作の回数も最小限に抑えられます。各クエリは配列の異なる領域に書き込むため、ターゲット配列への直接書き込みはスレッド セーフです。

これにより、行の範囲のみを分割する戦略が維持されます。行のサブ分割は可能ですが、より複雑であり、成果が得られることはめったにありません。これは、呼び出し元が非常に少ない数の行を要求しているが、行ごとに多くの値を要求しているというまれなケースでのみ役立ちます。しかし、それでも、メモリの局所性の問題により、複雑なサブ行分割がうまくいくかどうかは明らかではありません。


元の質問に関して、ForkJoinTask直接実装する場合は、getSurplusQueuedTaskCount()再度分割するか、直接計算するかを決定するために使用できます。

しきい値の選択は、同期する必要があるタスク オブジェクトの数によるオーバーヘッドとコア使用率の間のトレードオフです。ワークロードを完全にバランスよく分割でき、他の無関係なスレッドやプロセスが CPU 時間を使用しない場合は、コアごとに 1 つのアイテムを使用するのが最適です。しかし実際には、タスクがまったく同じ時間に実行されることはないため、最初に終了したコアによって実行される予備の分割タスクがあることが望ましいです。典型的なしきい値は 1 から 3 の間にあります (これはコアあたりのキューに入れられたタスクの数であることを思い出してください)。ワークロードが非常に均等な種類のタスクの場合、より小さい数を使用できます。たとえば、別のキュー項目がある場合は分割を停止します。

于 2016-12-01T11:47:09.977 に答える