3

私は並列処理と同時実行にかなり慣れていないので、Java で Fork-Join を使用してメディアン フィルター アルゴリズムを実装しようとしています。基本的に、入力ファイルを ArrayList に読み込み、そのリストを使用して、フィルター処理された中央値 (元の ArrayList の最初と最後の要素を含む) の新しい ArrayList を生成します。

これで、アルゴリズムのシリアル/シーケンシャル バージョンを作成することができ、正常に動作しました。ただし、Fork-Join バージョンを作成しようとすると、大きな ArrayLists(100000+) では機能しないようです。サイズ 5 の非常に小さな ArrayList で試してみましたが、問題なく動作します。エラーを見つけることができないようです (論理エラーおよび/または実装エラーであると確信しています)。どんな助けでも大歓迎です。

シーケンシャル アルゴリズムのスニペットは次のとおりです。

    //Add first boundary element to output ArrayList
    outputElements.add(this.elements.get(0));

    //Start Filter Algorithm 
    while(elements.size()-counter >= filterSize){
        for(int i = 0; i<filterSize; i++){
            tempElements.add(this.elements.get(i+counter));
            if(i==filterSize){
                break;
            }
        }

        Collections.sort(tempElements);
        outputElements.add(tempElements.get((filterSize-1)/2));

        counter++;
        tempElements.clear();
    }

    //Add last boundary element to output ArrayList.
    if (elements != null && !elements.isEmpty()) {
        outputElements.add(elements.get(elements.size()-1));
    }//End Filter Algorithm

これが私が作ったParallel Classです。これは機能していない部分です:

public class Parallel extends RecursiveTask<List<Float>>{
    int lo;
    int hi;
    int filterSize;
    String outFile; //Output file name.
    int arraySize;
    List<Float> elements = new ArrayList<Float>();
    List<Float> tempElements = new ArrayList<Float>();
    List<Float> outputElements = new ArrayList<Float>();
    int counter = 0;
    static final int SEQUENTIAL_CUTOFF=1000;

    public Parallel(List<Float> elements, int filterSize, String outFile, int lo, int hi) {
        this.lo = lo;
        this.hi = hi;
        this.elements = elements;
        this.outFile = outFile;
        this.filterSize = filterSize;       
        if(lo == 0){
            outputElements.add(this.elements.get(0));
        }
    }
    @Override
    protected List<Float> compute() {
        long startTime = System.nanoTime(); //Algorithm starts here 
        if((hi-lo) < SEQUENTIAL_CUTOFF) {
            while(hi-counter >= filterSize){
                for(int i = lo; i<filterSize; i++){
                    tempElements.add(this.elements.get(i+counter));
                    if(i==filterSize){
                        break;
                    }
                }               
                Collections.sort(tempElements);
                outputElements.add(tempElements.get((filterSize-1)/2));
                counter++;
                tempElements.clear();
                return outputElements;
            }
          }else{              
              Parallel left = new Parallel(this.elements, this.filterSize, this.outFile, this.lo, ((this.lo + this.hi)/2));
              Parallel right = new Parallel(this.elements, this.filterSize, this.outFile, ((this.hi + this.lo)/2), this.hi);
              left.fork();

              List<Float> leftArr = new ArrayList<Float>();
              List<Float> rightArr = new ArrayList<Float>();

             rightArr =  right.compute();
             leftArr = left.join();

             List<Float> newList = new ArrayList<Float>();
             newList.addAll(leftArr);
             newList.addAll(rightArr);       

          }
        long endTime = System.nanoTime();//Algorithm ends here.

        //Write elements to output file 
        PrintWriter writeOutput = null;
        try {
            writeOutput = new PrintWriter(this.outFile, "UTF-8");
        } catch (FileNotFoundException | UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        writeOutput.println(outputElements.size());//Number of lines
        for(int i=0; i<outputElements.size();i++){
            writeOutput.println(i+1 + " " + outputElements.get(i)); //Each line is written
        }

        writeOutput.close(); //Close when output finished writing.
        System.out.println("Parallel complete");
        return null;
    }
}

どんな助けでも本当に感謝しています。数時間を費やして SO と Google について多くの調査を行った後、これを正しく理解することはできません。

編集: music_coder は、私が直面しているエラーを投稿することを提案しました。それは多くのエラーです:

Exception in thread "main" java.lang.IndexOutOfBoundsException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
    at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
    at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
    at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521)
    at main.main(main.java:45)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:635)
    at java.util.ArrayList.get(ArrayList.java:411)
    at Parallel.compute(Parallel.java:44)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:1)
    at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
    at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
    at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
4

1 に答える 1

0

ArrayList一般に、スレッドセーフではないため、マルチスレッド コードで s を使用することは避ける必要があります。

この実装は同期されていないことに注意してください。複数のスレッドがインスタンスに同時にアクセスArrayListし、少なくとも 1 つのスレッドがリストを構造的に変更する場合は、外部で同期する必要があります。

あなたが投稿したスニペットには、リストを同時に変更するものは何も表示されませんが、子インスタンスにパスオフしていることがわかります。つまり、少なくとも危険なことを行っていることを意味ますスレッド)。this.elementsParallel

最初のパスとして、コンストラクターを次のように置き換えthis.elements = elements;ます。Parallel

this.elements = Collections.unmodifiableList(elements);

リストをunmodifiableParallelにすることで、コードがリストを変更しようとすると、失敗した時点で明確なエラーが発生することが保証されます。Parallelこれは、元のリストを変更する以外の何かを妨げるものではありませんが、正しく動作elementsしていることを確認するための迅速で簡単な方法です。Parallelを取得した場合UnsupportedOperationExceptionParallelクラスを再設計する必要があります。 をArrayList同時に変更することはできません。

が表示されない場合は、他のUnsupportedOperationException何かがリストを変更しています。それを見つけて削除する必要があります。


リストが同時に変更される原因を突き止めたら、最善の方法を決定することができます。スレッド間でデータを共有するためのすべての「正しい」方法を実行することは、この回答でカバーできる範囲を超えていますが、一般的な経験則を次に示します。

  • 可変データ構造を避ける- Guava のParallelような不変データ構造からのデータのみを処理するようにクラスを設計します。不変データ構造はデフォルトでスレッドセーフです。ImmutableList
  • スレッドセーフなデータ構造を使用する- たとえば、ConcurrentLinkedQueue複数のプロセスが同じデータ構造に読み書きするためのスレッドセーフな方法です。 ConcurrentHashMapは、一般的に使用される別のクラスです。何が必要かは、何をしようとしているのかによって異なりますが、これらは良い出発点です。
  • 同時操作の範囲を最小限に抑えます。同時データ構造を使用する場合でも、一般的な目標は、各タスクを分離して実行し、共有ソースからの読み取りと共有シンクへの書き込みを保存することです。1 つのスレッドにしか見えないオブジェクトに対して、できるだけ多くの作業を行います。
  • 同期-明示的な同期なしでParallel書き込みを行うことに気付きました。outFileこれは危険であり、問​​題が発生する可能性があります (クラッシュまたはさらに悪いデータ破損)。一度に 1 つのスレッドだけがファイルに書き込む必要があります。これは、専用のファイル書き込みスレッドを用意するか、ファイル書き込み操作を明示的に同期することによって行います。
于 2015-09-11T23:28:13.487 に答える