5

簡単に並列化できる作業が少しあります。Javaスレッドを使用して、作業を4つのコアマシンに分割したいと思います。これは、巡回セールスマン問題に適用される遺伝的アルゴリズムです。簡単に並列化できるようには聞こえませんが、最初のループは非常に簡単に並列化できます。実際の進化について話す2番目の部分はそうである場合とそうでない場合がありますが、スレッドの実装方法が原因で速度が低下しているのか、それともアルゴリズム自体が遅くなっているのかを知りたいです。

また、私がやろうとしていることをどのように実行すべきかについて誰かがより良いアイデアを持っているなら、それは非常にありがたいです。

main()には、次のものがあります。

 final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(numThreads*numIter);
 ThreadPoolExecutor tpool = new ThreadPoolExecutor(numThreads, numThreads, 10, TimeUnit.SECONDS, queue);
 barrier = new CyclicBarrier(numThreads);
 k.init(tpool);

init()内で実行されるループがあり、次のようになります。

for (int i = 0; i < numCities; i++) {
    x[i] = rand.nextInt(width);
    y[i] = rand.nextInt(height);
}

私がこれに変更したこと:

int errorCities = 0, stepCities = 0;
stepCities = numCities/numThreads;
errorCities = numCities - stepCities*numThreads;

// Split up work, assign to threads                                                                        
for (int i = 1; i <= numThreads; i++) {
    int startCities = (i-1)*stepCities;
    int endCities = startCities + stepCities;

    // This is a bit messy...                                                                              
    if(i <= numThreads) endCities += errorCities;
    tpool.execute(new citySetupThread(startCities, endCities));
}

そしてここにcitySetupThread()クラスがあります:

public class citySetupThread implements Runnable {
    int start, end;

    public citySetupThread(int s, int e) {
        start = s;
        end = e;
    }
    public void run() {
        for (int j = start; j < end; j++) {
            x[j] = ThreadLocalRandom.current().nextInt(0, width);
            y[j] = ThreadLocalRandom.current().nextInt(0, height);
        }

        try {
            barrier.await();
        } catch (InterruptedException ie) {
            return;
        } catch (BrokenBarrierException bbe) {
            return;
        }
    }
}

上記のコードはプログラム内で1回実行されるため、スレッド構造のテストケースのようなものでした(これはJavaスレッドでの初めての経験です)。私は実際のクリティカルセクション、特に遺伝的アルゴリズムの進化の部分に同じ種類のものを実装しました。そのクラスは次のとおりです。

public class evolveThread implements Runnable {
    int start, end;

    public evolveThread(int s, int e) {
        start = s;
        end = e;
    }
    public void run() {
        // Get midpoint                                                                                            
        int n = population.length/2, m;

        for (m = start; m > end; m--) {
            int i, j;
            i = ThreadLocalRandom.current().nextInt(0, n);

            do {
                j = ThreadLocalRandom.current().nextInt(0, n);
            } while(i == j);

            population[m].crossover(population[i], population[j]);
            population[m].mutate(numCities);
        }

        try {
            barrier.await();
        } catch (InterruptedException ie) {
            return;
        } catch (BrokenBarrierException bbe) {
            return;
        }

    }
}

これは、次のようにinit()で呼び出される関数evolve()に存在します。

for (int p = 0; p < numIter; p++) evolve(p, tpool);

はい、それはひどく良いデザインではないことを私は知っていますが、他の理由で私はそれに固執しています。進化の内部には、ここに示されている関連部分があります。

// Threaded inner loop                                                                                     
int startEvolve = popSize - 1,
endEvolve = (popSize - 1) - (popSize - 1)/numThreads;

// Split up work, assign to threads                                                                        
for (int i = 0; i < numThreads; i++) {
    endEvolve = (popSize - 1) - (popSize - 1)*(i + 1)/numThreads + 1;
    tpool.execute(new evolveThread(startEvolve, endEvolve));
    startEvolve = endEvolve;
}

// Wait for our comrades                                                                                   
try {
     barrier.await();
} catch (InterruptedException ie) {
     return;
} catch (BrokenBarrierException bbe) {
     return;
}

population[1].crossover(population[0], population[1]);
population[1].mutate(numCities);
population[0].mutate(numCities);

// Pick out the strongest                                                                                      
Arrays.sort(population, population[0]);
current = population[0];
generation++;

私が本当に知りたいのはこれです:

  • 「キュー」にはどのような役割がありますか?プール内のすべてのスレッドに対して実行されると思う数のジョブのキューを作成する権利はありますか?サイズが十分に大きくない場合、RejectedExecutionExceptionが発生します。numThreads * numIterationsを実行することにしたのは、それが(前述の実際のevolutionメソッドの場合)ジョブの数になるためです。それは奇妙ですが..barrier.await()が機能していれば、これを行う必要はありません。

  • 私はbarrier.await()を正しく使用していますか?現在、Runnableオブジェクトのrun()メソッド内と、すべてのジョブを実行するforループの後の2か所にあります。必要なのは1つだけだと思っていたのですが、どちらかを削除するとエラーが発生します。

  • 私はスレッドの競合を疑っています。それは、(入力パラメーターに比例する)不条理な速度低下から収集できる唯一のことだからです。スレッドプールとバリアをどのように実装しているかに関係があるかどうかを知りたいです。そうでない場合は、crossover()メソッドとmutate()メソッドの内部を調べる必要があると思います。

4

2 に答える 2

4

まず、CyclicBarrierの使用方法にバグがあると思います。現在、パーティの数としてエグゼキュータスレッドの数を使用して初期化しています。ただし、追加のパーティがあります。メインスレッド。だから私はあなたがする必要があると思います:

barrier = new CyclicBarrier(numThreads + 1);

これはうまくいくはずだと思いますが、個人的にはバリアの奇妙な使い方だと思います。

ワーカーキュースレッドプールモデルを使用する場合、セマフォまたはJavaのFutureモデルを使用する方が簡単です。

セマフォの場合:

class MyRunnable implements Runnable {
  private final Semaphore sem;

  public MyRunnable(Semaphore sem) {
    this.sem = sem;
  }

  public void run() {
    // do work

    // signal complete
    sem.release()
  }
}

次に、メインスレッドで:

Semaphore sem = new Semaphore(0);

for (int i = 0; i < numJobs; ++i) {
  threadPool.execute(new MyRunnable(sem));
}

sem.acquire(numJobs);

それは実際にはバリアと同じことをしますが、メインスレッドと再び「同期」するのではなく、ワーカータスクが実行されることを「シグナリング」することを考える方が簡単です。

たとえば、CyclicBarrier JavaDocのサンプルコードを見ると、呼び出しbarrier.await()はワーカー内のループ内にあります。したがって、実際には複数の長時間実行されているワーカースレッドを同期しており、メインスレッドはバリアに参加していません。ループ外のワーカーの最後で呼び出すbarrier.await()と、完了のシグナルが増えます。

于 2012-12-02T01:25:16.470 に答える
1

タスクの数を増やすと、各タスクの追加を使用してオーバーヘッドが増加します。これは、タスクの数を最小限に抑えたい、つまり、CPUの数と同じにしたいことを意味します。2倍の数を使用する一部のタスクでは、作業負荷が均等でない場合にCPUの数を増やすことができます。

ところで:各タスクに障壁は必要ありません。各タスクを呼び出すことで、各タスクの将来が完了するのを待つことができますget()

于 2012-12-02T11:28:23.703 に答える