3

私は fork/join で遊んでいて、次の例を考えました:

App1: 2 つの for ループでいくつかの乱数を生成して anArrayListに渡しますfork MyTask(Fork): を反復処理しArrayListsてすべての数値を合計し、値を返します。

import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;

public class App1 {

    ArrayList list = new ArrayList();

    static final ForkJoinPool mainPool = new ForkJoinPool();

    public App1() {
        for (int i = 0; i < 10; i++) {
            list.clear();
            for (int j = 1000; j <= 100000; j++) {
                int random = 1 + (int)(Math.random() * ((100 - 1) + 1));
                list.add(random);
            }
            mainPool.invoke(new MyTask(list));
        }
        // At the end showing all results
        // System.out.println (result1 + result2 + result3...);
    }

    public static void main(String[] args) {
        App1 app = new App1();
    }
}


import java.util.ArrayList;
import java.util.concurrent.RecursiveTask;

public class MyTask extends RecursiveTask<Integer> {

    ArrayList list = new ArrayList();
    int result;

    public MyTask(ArrayList list) {
        this.list = list;
    }

    @Override
    protected Integer compute() {
        for(int i=0; i<=list.size(); i++){
            result += (int)list.get(i); // adding up all numbers
        }
        return result;
    }
}

正しい軌道に乗っているかどうかはわかりません。また、フォークからすべての結果を収集する方法もわかりません。
誰でも私のコードを見てもらえますか?

4

4 に答える 4

1

コードは問題ないように見えますが、ForkJoinPool を間違った方法で使用しています。これは、独立したサブタスクに分割でき、マルチスレッド化によって高速化できるタスク用のツールです。

あなたのタスクはおそらくマルチスレッドの恩恵を受けるには十分な大きさではありませんが、それは学習課題であるため脇に置いておいて、メインタスクをサブタスクに分割する必要がありますが、配列全体を数えるだけです。一度。

コードでできること:

  • より適切なマルチスレッドの別のモードを使用します。

  • タスクをフォークして配列内の開始位置と終了位置を渡し、それらのサブタスクが完了したら結果を合計します。

おそらく後者に興味があるので、これを行う方法の例を次に示します。

public class MyTask extends RecursiveTask<Integer> {

  final ArrayList<Integer> list;
  final int start, end;

  public MyTask(ArrayList<Integer> list, int start, int end) {
    this.list = list;
    this.start = start;
    this.end = end;
  }

  @Override
  protected Integer compute() {
    if (end - start > 10) { // is this task big enough to justify more threading?
      final int half = (end + start) / 2;
      final MyTask firstHalf = new MyTask(list, start, half);
      final MyTask secondHalf = new MyTask(list, half+1, end);
      invokeAll(firstHalf, secondHalf);
      return firstHalf.get() + secondHalf.get();
    } else {
      int result = 0;
      for(int i=start; i<=end; i++){
        result += list.get(i); 
      }
      return result;
    }
  }
}
于 2013-11-12T10:10:15.840 に答える
0

あなたはRecursiveTask正しくない方法を使用しています。タスクを再帰的に呼び出す必要があります。以下の可能な解決策を見てください。

したがって、最終的にすべての作業は< THRESHOLD再帰的に部分に分割されます。

アップデート:

Java Concurrent Animatedを参照してください。jar をダウンロードして実行するだけで、どのように機能するかがわかりやすく視覚的に説明されています。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class RecursiveListSum {

  private static class RecursiveSum extends RecursiveTask<Long> {
    private static final long serialVersionUID = 1L;
    private static final int THRESHOLD = 1000;
    private final List<Integer> list;
    private final int begin;
    private final int end;

    public RecursiveSum(List<Integer> list, int begin, int end) {
      super();
      this.list = list;
      this.begin = begin;
      this.end = end;
    }

    @Override
    protected Long compute() {
      final int size = end - begin;
      // if the work to be done is below some threshold, just compute directly.
      if (size < THRESHOLD) {
        long sum = 0;
        for (int i = begin; i < end; i++)
          sum += list.get(i);
        return sum;
      } else {
        // split the work to other tasks - recursive (that's why it is called recursive task!)
        final int middle = begin + ((end - begin) / 2);
        RecursiveSum sum1 = new RecursiveSum(list, begin, middle);
        // invoke the first portion -> will be invoked in thread pool
        sum1.fork();
        RecursiveSum sum2 = new RecursiveSum(list, middle, end);
        // now do a blocking! compute on the second task and wait for the result of the first task.
        return sum2.compute() + sum1.join();
      }
    }
  }

  public static void main(String[] args) {
    // First fill the list
    List<Integer> list = new ArrayList<>();
    long expectedSum = 0;
    for (int i = 0; i < 10000; i++) {
      int random = 1 + (int) (Math.random() * ((100 - 1) + 1));
      list.add(random);
      expectedSum += random;
    }
    System.out.println("expected sum: " + expectedSum);

    // now let the RecursiveTask calc the sum again.
    final ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
    final RecursiveSum recursiveSum = new RecursiveSum(list, 0, list.size());
    long recSum = forkJoinPool.invoke(recursiveSum);
    System.out.println("recursive-sum: " + recSum);
  }

}
于 2013-11-12T10:16:13.490 に答える
0

の結果を使用するだけですinvoke

Integer result = mainPool.invoke(new MyTask(list));
System.out.println(i + "\t" + result);
于 2013-11-12T10:09:26.903 に答える
0

何かを返す必要があるかどうかにかかわらず、必要に応じRecursiveActionて拡張する必要があります。RecursiveTask

両方のクラスで、次の関数をオーバーライドする必要があります。

protected abstract void compute();//in RecursiveAction
protected abstract V compute(); //in RecursiveTask

以下は、変更されたクイックソートです。これを守って、自分自身を実装してみてください:

public class ForkJoinQuicksortTask extends RecursiveAction {
    static final int SEQUENTIAL_THRESHOLD = 10000;

    private final int[] a;
    private final int left;
    private final int right;

    public ForkJoinQuicksortTask(int[] a) {
        this(a, 0, a.length - 1);
    }

    private ForkJoinQuicksortTask(int[] a, int left, int right) {
        this.a = a;
        this.left = left;
        this.right = right;
    }

    @Override
    protected void compute() {
        if (serialThresholdMet()) {
            Arrays.sort(a, left, right + 1);
        } else {
            int pivotIndex = partition(a, left, right);
            ForkJoinQuicksortTask  t1 = new ForkJoinQuicksortTask(a, left, pivotIndex-1);
            ForkJoinQuicksortTask t2 = new ForkJoinQuicksortTask(a, pivotIndex + 1, right);
            t1.fork();
            t2.compute();
            t1.join();
        }
    }
    int partition(int[] a, int p, int r){
        int i=p-1;
        int x=a[r];
        for(int j=p;j<r;j++){
            if(a[j]<x){
                i++;
                swap(a, i, j);
            }
        }
        i++;
        swap(a, i, r);
        return i;
    }

    void swap(int[] a, int p, int r){
        int t=a[p];
        a[p]=a[r];
        a[r]=t;
    }

    private boolean serialThresholdMet() {
        return right - left < SEQUENTIAL_THRESHOLD;
    }
    public static void main(String[] args){
        ForkJoinPool fjPool = new ForkJoinPool();
        int[] a=new int[3333344];
        for(int i=0;i<a.length;i++){
            int k=(int)(Math.random()*22222);
            a[i]=k;
        }
        ForkJoinQuicksortTask forkJoinQuicksortTask=new ForkJoinQuicksortTask(a, 0, a.length-1);
        long start=System.nanoTime();
        fjPool.invoke(forkJoinQuicksortTask);
        System.out.println("Time: "+ (System.nanoTime()-start));
    }
}
于 2013-11-12T10:09:43.530 に答える