3

Java で Phaser を理解しようとしています。他のパーティーが到着するのを待って事前に立ち往生している例を書きました。

私が理解している限り、フェーザーは再利用可能なスレッド同期 (再利用できない CountdownLatch とは異なり) バリア アクションを備えたバリアとして使用されます (状態を共有するために使用される Cyclicbarrier とは異なり、Phaser はバリア アクションで状態を共有する必要はありません)。私が間違っている場合は修正してください。

したがって、私の例では、特定の数のパーティ/スレッドがバリアに到達した後、各スレッドでランダムな加算および減算コードを実行しようとしています。私は何を間違っていますか?

import static java.lang.String.*;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;

public class PhaserUsage implements Callable<String> {

    private static final int THREAD_POOL_SIZE = 10;
    private final Phaser phaser;

    private PhaserUsage(Phaser phaser) {
        this.phaser = phaser;
    }

    public static void main(String a[]) {
        ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        CompletionService<String> completionService = new ExecutorCompletionService<>(execService);

        Phaser phaser = new Phaser(1);
        IntStream.range(0, THREAD_POOL_SIZE)
                .forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));

        execService.shutdown();

         try {
             while (!execService.isTerminated()) {
                String result = completionService.take().get();
                System.out.println(format("Result is: %s", result));
             }
          } catch (ExecutionException | InterruptedException e) {
             e.printStackTrace();
          }
    }

    @Override
    public String call() {
        String threadName = Thread.currentThread().getName();
        System.out.println(format("Registering...%s",threadName));
        phaser.register();
        System.out.println(format("Arrive and await advance...%s",threadName));
        phaser.arriveAndAwaitAdvance(); // await all creation
        int a = 0, b = 1;
        Random random = new Random();
        for (int i = 0; i < random.nextInt(10000000); i++) {
            a = a + b;
            b = a - b;
        }
        System.out.println(format("De-registering...%s",threadName));
        phaser.arriveAndDeregister();
        return format("Thread %s results: a = %s, b = %s", threadName, a, b);
    }
}
4

3 に答える 3

3

問題は、登録されているタスク内から呼び出すことができないことです。phaser.register()フェイザーを使用するときは、常に次の 2 つのルールに従います。

  1. 登録されたタスクのみが他のタスクを登録できます。これは、タスクがそれ自体を登録できないことを意味します。
  2. 登録されたすべてのタスクは、終了する前に登録を解除する必要があります。finally最後に登録解除するブロックの周りにフェイザーを使用してコードをラップすることをお勧めします(例を参照)。

修正したプログラムは次のとおりです (フェイザーを作成する行に注目してください)。

import static java.lang.String.*;

import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.IntStream;

public class PhaserUsage implements Callable<String> {

    private static final int THREAD_POOL_SIZE = 10;
    private final Phaser phaser;

    private PhaserUsage(Phaser phaser) {
        this.phaser = phaser;
    }

    public static void main(String a[]) {
        ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        CompletionService<String> completionService = new ExecutorCompletionService<>(execService);

        // since we know beforehand how many tasks we have, initialize the
        // number of participants in the constructor; other wise register
        // *before* launching the task
        Phaser phaser = new Phaser(THREAD_POOL_SIZE);

        IntStream.range(0, THREAD_POOL_SIZE)
                .forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));

        execService.shutdown();

         try {
             while (!execService.isTerminated()) {
                String result = completionService.take().get();
                System.out.println(format("Result is: %s", result));
             }
          } catch (ExecutionException | InterruptedException e) {
             e.printStackTrace();
          }
    }

    @Override
    public String call() {
        String threadName = Thread.currentThread().getName();
        System.out.println(format("Arrive and await advance...%s",threadName));
        phaser.arriveAndAwaitAdvance(); // await all creation
        int a = 0, b = 1;
        Random random = new Random();
        for (int i = 0; i < random.nextInt(10000000); i++) {
            a = a + b;
            b = a - b;
        }
        System.out.println(format("De-registering...%s",threadName));
        phaser.arriveAndDeregister();
        return format("Thread %s results: a = %s, b = %s", threadName, a, b);
    }
}
于 2016-06-06T22:51:52.340 に答える