0

私は、それぞれがリソース集約的で、それぞれ何百万もの入力で処理する必要があるさまざまなアルゴリズムを持っています。入力をブロックに分割し、ブロックを並列処理し、最終的に結果を単一の出力配列に正しい順序で組み立てたいと思います。

私はこの問題について調査を行ってきましたが、コンセンサスは、 と を使用する必要があるようExecutorServiceですarraycopy()。ただし、作成するスレッドの最適な数を決定する方法がわかりません。また、バグのリスクを排除する方法でコードを構成する方法もわかりません。結果の配列を作成した後、各スレッドが終了したことを知っていればよいでしょう。最後に、以下に記述したコードでもヌル ポインター エラーが発生します。

バグのリスクを排除しながら、上記の目標をできるだけ早く達成できるように、以下のコードを編集してください。以下のコードが 5 ミリ秒または 10 ミリ秒で実行できると便利です。配列内の乱数は、スレッド オプションを比較するためのベンチマークとして機能するプレースホルダーにすぎません。実際のアルゴリズムは乱数生成とは関係がないため、乱数生成を最適化する必要はありません。これが私の進行中の作業です:

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ParallelArrays {
    private static final int numJobs = 5;
    static int numElements = 1473200;
    static int blockSize = 300000;
    static int remaining;
    String number;
    static int[][] data2D = new int[numJobs][];
    static int[] data1D;
    static int size;
    static int currIdx;

    public static void main(String args[]) {
        long startTime = System.currentTimeMillis();
        remaining = numElements-blockSize;
        // create a pool of threads, 10 max jobs will execute in parallel
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        // submit jobs to be executing by the pool
        for (int i = 0; i < numJobs; i++) {
            currIdx = i;
            System.out.println("This coming iteration would leave us with remaining, blockSize: "+remaining+", "+blockSize);
            if(remaining>=0){System.out.println("blockSize is: "+blockSize);}
            else{
                blockSize = (blockSize+remaining);
                remaining = 0;
                System.out.println("else blockSize is: "+blockSize);
            }
            System.out.println("After iteration, remaining, blockSize are: "+remaining+", "+blockSize);
            threadPool.submit(new Runnable() {
                public void run() {
                    Random r = new Random();
                    data2D[currIdx] = new int[blockSize];
                    for(int j=0;j<data2D[currIdx].length;j++){
                        data2D[currIdx][j] = r.nextInt(255)*r.nextInt(255)*r.nextInt(255);
                    }
                }
            });
            remaining -= blockSize;
        }
        //Now collapse data2D into a 1D array
        data1D = new int[numElements];
        int startPos = 0;
        for(int k=0;k<numJobs;k++){
            System.out.println("startPos is: "+startPos);
            //arraycopy(Object src, int srcPos, Object dest, int destPos, int length);
            System.out.println("k is: "+k);
            System.out.println("data2D[k].length is: "+data2D[k].length);
            System.arraycopy(data2D[k], 0, data1D, startPos, data2D[k].length);
            startPos += data2D[k].length;
        }
        threadPool.shutdown();
        System.out.println("Main thread exiting.");
        long endTime = System.currentTimeMillis();
        System.out.println("Elapsed time is: "+(endTime-startTime));
    }
}  

2番目の編集:

Ralf H の提案に応えて、次のようにコードを編集しました。同じヌル ポインター例外をまだスローしています。null ポインター例外をスローせずに正しく実行されるように、このコードを書き直してください。

package myPackage;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ParallelArrays {
    private final static int numJobs = 5;
    static int numElements = 1473200;
    static int blockSize = 300000;
    static int remaining;
    String number;
    static int[][] data2D = new int[numJobs][];
    static int[] data1D;
//  static int size;
    static int currIdx;
    static int numAdded = 0;

    public static void main(String args[]) {runAlgorithm();}

    static void runAlgorithm(){
        long startTime = System.currentTimeMillis();
        remaining = numElements-blockSize;
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < numJobs; i++) {// submit jobs to be executing by the pool
            currIdx = i;
            if(remaining<0){//last block will be smaller than the rest
                blockSize = (blockSize+remaining);
                remaining = 0;
            }
            final int fCurrIdx = i;
            threadPool.submit(new Runnable() {
                public void run() {
                    Random r = new Random();
                    data2D[fCurrIdx] = new int[blockSize];
                    System.out.println("fCurrIdx is: "+fCurrIdx);
                    for(int j=0;j<data2D[fCurrIdx].length;j++){
                        data2D[fCurrIdx][j] = r.nextInt(255)*r.nextInt(255)*r.nextInt(255);
                    }
                    numAdded += 1;
                }
            });
            remaining -= blockSize;
        }
        //Now collapse data2D into a 1D array
        data1D = new int[numElements];
        System.out.println("numAdded, data2D.length is: "+numAdded+", "+data2D.length);
        int startPos = 0;
        for(int k=0;k<numJobs;k++){
            System.out.println("startPos is: "+startPos);
            //arraycopy(Object src, int srcPos, Object dest, int destPos, int length);
            System.out.println("k, data2D["+k+"].length are: "+k+", "+data2D[k].length); // NullPointerException here
            System.arraycopy(data2D[k], 0, data1D, startPos, data2D[k].length);
            startPos += data2D[k].length;
        }
        threadPool.shutdown();
        System.out.println("Main thread exiting.");
        long endTime = System.currentTimeMillis();
        System.out.println("Elapsed time is: "+(endTime-startTime));
    }
}  

改訂されたコードによってスローされた null ポインター エラーのスタック トレースを次に示します。

Exception in thread "main" java.lang.NullPointerException  
    at myPackage.ParallelArrays.runAlgorithm(ParallelArrays.java:52)  
    at myPackage.ParallelArrays.main(ParallelArrays.java:19)  

問題は、コードが Future オブジェクトとExecutorService. しかし、この特定のコードの構文はわかりません。

4

3 に答える 3

2

このタスクには ForkJoinPool の方が適していると思います。効率的な並列処理用に設計されています。 http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.htmlを参照してください。

于 2013-06-30T03:53:31.920 に答える
2

最近、同じ種類の問題に遭遇しました。それらは、同じ非同期変数を使用したタスク間の同期が原因でした(他の人がそれを読んでいるためにコメントが残されています:))。

あなたの場合、@Ralf が述べたように、プールの終了を待っていません。したがって、配列はまだ( for all ) でdata2D満たされているため、 を実行すると NPE が発生します。nulldata2D[k] == nullkdata2D[k].length

コードの 2 番目のバージョンを実行しようとしましたが、10 回実行した後、NPE がスローされることがあります。呼び出すと消えましたawaitTermination()

threadPool.shutdown();
try {
    while (!threadPool.awaitTermination(1, TimeUnit.SECONDS)) ;
} catch (InterruptedException e) {
    e.printStackTrace();
}

//Now collapse data2D into a 1D array
data1D = new int[numElements];
...
于 2013-08-09T06:07:00.727 に答える