3

Phaser を使用していつ作業を終了するかを知るマルチスレッド アプリケーションを作成しています。問題は、ExecutorCompletionService ではキューに 100k のスレッドが存在する可能性があることですが、Phaser での未到着パーティの最大数は 65535 です。65536 パーティが到着した場合、どうすればよいですか?

私のコード例:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws Exception {
    ExecutorService ec = Executors.newFixedThreadPool(10);
    ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
            ec);
    Phaser phaser = new Phaser();

    // register first node/thread
    ecs.submit(new SimpleParser("startfile.txt"));
    phaser.register();

    Future<List<String>> future;
    do {
        future = ecs.poll();
        if(future!=null && future.get() != null) {
            addParties(phaser, future.get(), ecs);
            phaser.arriveAndDeregister();
        }

        if (phaser.isTerminated()) {
            ec.shutdown();
        }
    } while (!ec.isShutdown() && !phaser.isTerminated());
}

public static void addParties(Phaser p, List<String> filenames,
        ExecutorCompletionService<List<String>> ecs) {
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        p.register();
    }
}

static class SimpleParser implements Callable<List<String>> {

    String fileName;

    public SimpleParser(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public List<String> call() throws Exception {
        return parseFile();
    }

    private List<String> parseFile() {
        return new ArrayList<String>(Arrays.asList(new String[] {
                "somefilename1.txt", "somefilename2.txt" }));
    }

}
}

問題は addParties() メソッドにあります。シングル スレッド (SimpleParser) は 100 個の新しいファイル名を返すことができ、100 個の新しいスレッドが ExecutorCompletionService に送信され、100 個の新しいパーティが Phaser に登録されます。私はこのようなものを使用しようとしました:

if(p.getUnarrivedParties() == 65535)
            p = new Phaser(p);

フェイザーのチェーンを作成しますが、p.getUnarrivedParties()が0を返すため、役に立ちませんでしたが、次のパーティーを登録できません...

    System.out.println(p.getUnarrivedParties());
        if(p.getUnarrivedParties() == 65535) {
            p = new Phaser(p);
            System.out.println(p.getUnarrivedParties());
        }
        p.register();

プリント:

65535

0

IllegalStateException をスローします

では、この古いものに接続される新しい Phaser を作成するにはどうすればよいでしょうか?

//編集

ありがとう@ボウモア。あと2つだけ質問があります。

例を見てみましょう:

import java.util.concurrent.Phaser;

public class Test2 {
    public static void main(String[] args) {
        Phaser parent = new Phaser();
        Phaser child1 = new Phaser(parent);
        Phaser child2 = new Phaser(parent);
        child1.register();
        child2.register();

        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child1.isTerminated()+"\n");

        child1.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");

        child2.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");
    }
}

それは印刷します:

Parent: false
Child1: false
Child2: false

Parent: false
Child1: false
Child2: false

Parent: true
Child1: true
Child2: true

なぜ後 child1.arriveAndDeregister(); child1 は終了していませんが、実際に終了しているかどうかを確認する方法は?

2 番目の質問です。何千もの新しいオブジェクトを作成するのは無意味だと思ったので、65535 パーティーに達した後、新しい Phaser を作成することについて尋ねました。これでメモリの問題は発生しないと思いますか、それともパフォーマンスを向上させることさえできると思いますか?

4

1 に答える 1

3

Phaser既存の新しいプロセスに登録する代わりにPhaser、オリジナルの新しく作成された子プロセスに登録できます。子の作成は、子のコンストラクターにPhaser親を提供するだけで行われます。Phaser

public static void addParties(Phaser p, List<String> filenames,
                              ExecutorCompletionService<List<String>> ecs) {
    Phaser newPhaser = new Phaser(p);
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        newPhaser.register();
    }
}

特定のしきい値に達した場合にのみ子 Phaser を作成する場合は、未到着のパーティーの数よりも登録済みのパーティーの数を確認できます。

public static void addParties(Phaser p, List<String> filenames, ExecutorCompletionService<List<String>> ecs) {
    Phaser toRegister = p.getRegisteredParties() > THRESHOLD ? new Phaser(p) : p;
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        toRegister.register();
    }
    System.out.println(p.getRegisteredParties());
}

編集 :

質問 1 をフォローアップするには: ChildPhaserは終了状態を root と共有しますPhaserisTerminated()

public boolean isTerminated() {
    return root.state < 0L;
}

質問 2 の補足として、親 Phaser は実際には子 Phaser への参照を保持していません。子フェーザーが参照されなくなると、ガベージ コレクションの対象になります。javadoc にあるアドバイスに従うのが最善です:

TASKS_PER_PHASER の最適な値は、主に予想される同期率に依存します。フェーズごとのタスク本体が非常に小さい (したがってレートが高い) 場合は 4 程度の値が適切であり、非常に大きい場合は数百までの値が適切です。

階層化の主な理由は、重い同期の競合を減らすことです。そのため、軽量のタスクの場合は、フェーザーあたりのタスクが少ない方がよいでしょう。これらを微調整するためにさまざまな設定をプロファイリングすることは決して悪いことではありません。

于 2012-12-30T20:46:11.103 に答える