Phaser を使用していつ作業を終了するかを知るマルチスレッド アプリケーションを作成しています。問題は、ExecutorCompletionService ではキューに 100k のスレッドが存在する可能性があることですが、Phaser での未到着パーティの最大数は 65535 です。65536 パーティが到着した場合、どうすればよいですか?
私のコード例:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
ExecutorService ec = Executors.newFixedThreadPool(10);
ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
ec);
Phaser phaser = new Phaser();
// register first node/thread
ecs.submit(new SimpleParser("startfile.txt"));
phaser.register();
Future<List<String>> future;
do {
future = ecs.poll();
if(future!=null && future.get() != null) {
addParties(phaser, future.get(), ecs);
phaser.arriveAndDeregister();
}
if (phaser.isTerminated()) {
ec.shutdown();
}
} while (!ec.isShutdown() && !phaser.isTerminated());
}
public static void addParties(Phaser p, List<String> filenames,
ExecutorCompletionService<List<String>> ecs) {
for (int i = 0; i < filenames.size(); i++) {
ecs.submit(new SimpleParser(filenames.get(i)));
//PROBLEM = What to do when Phaser has 65535+ unarrived parties
p.register();
}
}
static class SimpleParser implements Callable<List<String>> {
String fileName;
public SimpleParser(String fileName) {
this.fileName = fileName;
}
@Override
public List<String> call() throws Exception {
return parseFile();
}
private List<String> parseFile() {
return new ArrayList<String>(Arrays.asList(new String[] {
"somefilename1.txt", "somefilename2.txt" }));
}
}
}
問題は addParties() メソッドにあります。シングル スレッド (SimpleParser) は 100 個の新しいファイル名を返すことができ、100 個の新しいスレッドが ExecutorCompletionService に送信され、100 個の新しいパーティが Phaser に登録されます。私はこのようなものを使用しようとしました:
if(p.getUnarrivedParties() == 65535)
p = new Phaser(p);
フェイザーのチェーンを作成しますが、p.getUnarrivedParties()が0を返すため、役に立ちませんでしたが、次のパーティーを登録できません...
System.out.println(p.getUnarrivedParties());
if(p.getUnarrivedParties() == 65535) {
p = new Phaser(p);
System.out.println(p.getUnarrivedParties());
}
p.register();
プリント:
65535
0
IllegalStateException をスローします
では、この古いものに接続される新しい Phaser を作成するにはどうすればよいでしょうか?
//編集
ありがとう@ボウモア。あと2つだけ質問があります。
例を見てみましょう:
import java.util.concurrent.Phaser;
public class Test2 {
public static void main(String[] args) {
Phaser parent = new Phaser();
Phaser child1 = new Phaser(parent);
Phaser child2 = new Phaser(parent);
child1.register();
child2.register();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child1.isTerminated()+"\n");
child1.arriveAndDeregister();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child2.isTerminated()+"\n");
child2.arriveAndDeregister();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child2.isTerminated()+"\n");
}
}
それは印刷します:
Parent: false
Child1: false
Child2: false
Parent: false
Child1: false
Child2: false
Parent: true
Child1: true
Child2: true
なぜ後 child1.arriveAndDeregister(); child1 は終了していませんが、実際に終了しているかどうかを確認する方法は?
2 番目の質問です。何千もの新しいオブジェクトを作成するのは無意味だと思ったので、65535 パーティーに達した後、新しい Phaser を作成することについて尋ねました。これでメモリの問題は発生しないと思いますか、それともパフォーマンスを向上させることさえできると思いますか?