BoundedExecutor
『Java Concurrency in Practice』の実装には奇妙なことがあります。
エグゼキュータでキューに入れられているか実行されているスレッドが十分にある場合、送信スレッドをブロックすることにより、エグゼキュータへのタスク送信を抑制することになっています。
これは実装です(catch句に欠落している再スローを追加した後):
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
@Override public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
とを4の範囲でインスタンス化BoundedExecutor
するとExecutors.newCachedThreadPool()
、キャッシュされたスレッドプールによってインスタンス化されるスレッドの数が4を超えることはないと予想されますが、実際にはそうなります。私はこの小さなテストプログラムを入手して、最大11個のスレッドを作成しました。
public static void main(String[] args) throws Exception {
class CountingThreadFactory implements ThreadFactory {
int count;
@Override public Thread newThread(Runnable r) {
++count;
return new Thread(r);
}
}
List<Integer> counts = new ArrayList<Integer>();
for (int n = 0; n < 100; ++n) {
CountingThreadFactory countingThreadFactory = new CountingThreadFactory();
ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory);
try {
BoundedExecutor be = new BoundedExecutor(exec, 4);
for (int i = 0; i < 20000; ++i) {
be.submitTask(new Runnable() {
@Override public void run() {}
});
}
} finally {
exec.shutdown();
}
counts.add(countingThreadFactory.count);
}
System.out.println(Collections.max(counts));
}
セマフォのリリースとタスクの終了の間には、リリーススレッドがまだ終了していないときに別のスレッドが許可を取得してタスクを送信できる、ごくわずかな時間枠があると思います。つまり、競合状態があります。
誰かがこれを確認できますか?