Producer、Processor、Consumer の 3 つのスレッドがあり、それらすべてにデータを共有するためのブロッキング キューがあります。私はこれらのスレッドに参加したかったのですが、そのために future を使用しているので、コードは次のようになります -
public class Test {
private static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private Producer(BlockingQueue<Integer> queue) {
this.queue = checkNotNull(queue);
}
@Override public void run() {
try {
int i = 0;
while (++i < Integer.MAX_VALUE) {
addEntry(i);
}
} finally {
addEntry(-1);
}
}
private void addEntry(int i) {
try {
queue.put(i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static class Processor implements Runnable {
private final BlockingQueue<Integer> readQueue;
private final BlockingQueue<Integer> writeQueue;
private Processor(BlockingQueue<Integer> readQueue, BlockingQueue<Integer> writeQueue) {
this.readQueue = checkNotNull(readQueue);
this.writeQueue = checkNotNull(writeQueue);
}
@Override public void run() {
try {
int i = readQueue.take();
while (i != -1) {
writeQueue.put(i);
i = readQueue.take();
if(i==1000){
throw new NullPointerException();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
addEntry(-1);
}
}
private void addEntry(int i) {
try {
writeQueue.put(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private Consumer(BlockingQueue<Integer> queue) {
this.queue = checkNotNull(queue);
}
@Override public void run() {
try {
int i = queue.take();
while (i != -1) {
System.out.println(i);
i = queue.take();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
BlockingQueue<Integer> readQueue = new ArrayBlockingQueue<>(1000);
BlockingQueue<Integer> writeQueue = new ArrayBlockingQueue<>(1000);
ExecutorService executorService = Executors.newFixedThreadPool(3);
Runnable[] runnables = new Runnable[]{new Producer(readQueue), new Processor(readQueue, writeQueue), new Consumer(writeQueue)};
List<Future<?>> futures = Lists.newArrayList();
for (Runnable runnable : runnables) {
futures.add(executorService.submit(runnable));
}
executorService.shutdown();
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
} catch( ExecutionException e){
executorService.shutdownNow();
throw new RuntimeException(e);
}finally{
future.cancel(true);
}
}
System.out.println("Done..");
}
}
Futute#get() が例外 (プロセッサー内の NPE) をスローした場合、すべてのスレッド (プロデューサー、プロセッサー、コンシューマー) を停止し、正常に終了します。
どうすればそれを達成できますか?