0

Producer、Processor、Consumer の 3 つのスレッドがあり、それらすべてにデータを共有するためのブロッキング キューがあります。私はこれらのスレッドに参加したかったのですが、そのために future を使用しているので、コードは次のようになります -

public class Test {


    private static class Producer implements Runnable {

        private final BlockingQueue<Integer> queue;

        private Producer(BlockingQueue<Integer> queue) {
            this.queue = checkNotNull(queue);
        }

        @Override public void run() {
            try {
                int i = 0;
                while (++i < Integer.MAX_VALUE) {
                    addEntry(i);
                }
            } finally {
                addEntry(-1);
            }
        }

        private void addEntry(int i) {
            try {
                queue.put(i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private static class Processor implements Runnable {

        private final BlockingQueue<Integer> readQueue;
        private final BlockingQueue<Integer> writeQueue;

        private Processor(BlockingQueue<Integer> readQueue, BlockingQueue<Integer> writeQueue) {
            this.readQueue = checkNotNull(readQueue);
            this.writeQueue = checkNotNull(writeQueue);
        }

        @Override public void run() {
            try {
                int i = readQueue.take();
                while (i != -1) {
                    writeQueue.put(i);
                    i = readQueue.take();
                    if(i==1000){
                        throw new NullPointerException();
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                addEntry(-1);
            }
        }

        private void addEntry(int i) {
            try {
                writeQueue.put(i);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class Consumer implements Runnable {

        private final BlockingQueue<Integer> queue;

        private Consumer(BlockingQueue<Integer> queue) {
            this.queue = checkNotNull(queue);
        }

        @Override public void run() {
            try {
                int i = queue.take();
                while (i != -1) {
                    System.out.println(i);
                    i = queue.take();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

        }
    }

    public static void main(String[] args) {
        BlockingQueue<Integer> readQueue = new ArrayBlockingQueue<>(1000);
        BlockingQueue<Integer> writeQueue = new ArrayBlockingQueue<>(1000);
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        Runnable[] runnables = new Runnable[]{new Producer(readQueue), new Processor(readQueue, writeQueue), new Consumer(writeQueue)};
        List<Future<?>> futures = Lists.newArrayList();
        for (Runnable runnable : runnables) {
            futures.add(executorService.submit(runnable));
        }
        executorService.shutdown();
        for (Future<?> future : futures) {
            try {
                future.get();
            } catch (InterruptedException e) {
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            } catch( ExecutionException e){
                executorService.shutdownNow();
                throw new RuntimeException(e);
            }finally{
                future.cancel(true);
            }
        }
        System.out.println("Done..");
    }
}

Futute#get() が例外 (プロセッサー内の NPE) をスローした場合、すべてのスレッド (プロデューサー、プロセッサー、コンシューマー) を停止し、正常に終了します。

どうすればそれを達成できますか?

4

1 に答える 1

0

スレッドを強制的に終了させることはできません。シグナルを受信したときに終了するようにスレッドをコーディングすることしかできません。スレッドの終了を通知する方法として、スレッド割り込みを使用できます。を呼び出したときに実行中executorService.shutdownNow()のスレッドを終了する場合は、スレッドが中断されたときにスレッドを終了する必要があります。

たとえば、あなたのConsumer

    @Override public void run() {
        try {
            int i = queue.take();
            while (i != -1) {
                System.out.println(i);
                i = queue.take();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

スレッドが中断されると、スレッドはinterrupt()再び自分自身を呼び出します。これでは何も達成されません。このrunメソッドは、次を受け取ったときに戻る必要がありInterruptedExceptionます。

    @Override public void run() {
        try {
            int i = queue.take();
            while (i != -1) {
                System.out.println(i);
                i = queue.take();
            }
        } catch (InterruptedException e) {
            System.err.println("Consumer interrupted");
            return;
        }
于 2013-09-12T13:34:43.027 に答える