3

何か問題が発生した場合に IOException をスローする必要があるファイルのディレクトリで作業するタスクがあります。また、高速化する必要があるため、完了した作業を複数のスレッドに分割し、それらの終了を待っています。次のようになります。

//Needs to throw IOException so the rest of the framework handles it properly.
public void process(File directory) throws IOException {
    ExecutorService executorService =
        new ThreadPoolExecutor(16, 16, Long.MAX_VALUE, TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());

    //Convenience class to walk over relevant file types.
    Source source = new SourceImpl(directory);
    while (source.hasNext()) {
        File file = source.next();
        executorService.execute(new Worker(file));
    }

    try {
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        executorService.shutdownNow();
        throw new IOException("Worker thread had a problem!");
    }
}

ワーカースレッドは基本的に次のとおりです。

private class Worker implements Runnable {
    private final File file;
    public Worker(File file) { this.file = file; }

    @Override
    public void run() {
        try {
            //Do work
        } catch (IOException e) {
            Thread.currentThread().interrupt();
        }
    }
}

望ましい動作は、Worker に IOException がある場合、生成スレッドがそれを認識し、独自の IOException をスローできることです。これは、ワーカー スレッドがエラーを通知できるようにするための最善の方法でしたが、正しく設定したかどうかはまだわかりません。

それで、まず第一に、これは私が期待していることをしますか?ワーカー スレッドの run() でエラーが発生した場合、呼び出しThread.currentThread().interrupt();によって InterruptedException がスローされ、ブロックによってキャッチされるようになりexecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);ますか?

第 2 に、すべてのスレッドがキューに入れられる前に、実行中のワーカーがその割り込みを呼び出した場合はどうなりますか。ブロッキングの try/catch ブロックの前に?

最後に (そして最も重要なことに)、私の目的を達成するためのよりエレガントな方法はありますか? 無数のサブスレッドをすべて、完了するまで、またはいずれかのサブスレッドにエラーが発生するまで実行させたいと考えています。その時点で、(ディレクトリ全体を効果的に失敗させることによって) 生成スレッドで処理したいと考えています。


解決

答えに基づいて、これが私が最終的に使用した実装です。それは私の非同期の欲求をうまく処理し、IOExceptionsできれいにそして比較的速く失敗します。

public void process(File directory) throws IOException {
    //Set up a thread pool of 16 to do work.
    ExecutorService executorService = Executors.newFixedThreadPool(16);
    //Arbitrary file source.
    Source source = new SourceImpl(directory);
    //List to hold references to all worker threads.
    ArrayList<Callable<IOException>> filesToWork =
        new ArrayList<Callable<IOException>>();
    //Service to manage the running of the threads.
    ExecutorCompletionService<IOException> ecs =
        new ExecutorCompletionService<IOException>(executorService);

    //Queue up all of the file worker threads.
    while (source.hasNext())
        filesToWork.add(new Worker(file));

    //Store the potential results of each worker thread.
    int n = filesToWork.size();
    ArrayList<Future<IOException>> futures =
        new ArrayList<Future<IOException>>(n);

    //Prepare to return an arbitrary worker's exception.
    IOException exception = null;
    try {
        //Add all workers to the ECS and Future collection.
        for (Callable<IOException> callable : filesToWork)
            futures.add(ecs.submit(callable));
        for (int i = 0; i < n; i++) {
            try {
                //Get each result as it's available, sometimes blocking.
                IOException e = ecs.take().get();
                //Stop if an exception is returned.
                if (e != null) {
                    exception = e;
                    break;
                }
            //Also catch our own exceptions.
            } catch (InterruptedException e) {
                exception = new IOException(e);
                break;
            } catch (ExecutionException e) {
                exception = new IOException(e);
                break;
            }
        }
    } finally {
        //Stop any pending tasks if we broke early.
        for (Future<IOException> f : futures)
            f.cancel(true);
        //And kill all of the threads.
        executorService.shutdownNow();
    }

    //If anything went wrong, it was preserved. Throw it now.
    if (exception != null)
        throw exception;
}

//Does work, and returns (not throws) an IOException object on error.
private class Worker implements Callable<IOException> {
    private final File file;
    public Worker(File file) { this.file = file; }

    @Override
    public IOException call() {
        try {
            //Do work
        } catch (IOException e) {
            return e;
        }
        return null;
    }
}
4

3 に答える 3

2

interrupt()そのように呼び出しても、メイン スレッドには影響し ません。

代わりにすべきことは、ワーカーを aCallableではなく a にRunnableし、失敗例外がcall()メソッドを離れることを許可することです。次に、ExecutorCompletionServiceを使用してすべてのワーカーを実行します。これにより、各タスクのステータスを判断し、タスクの 1 つが失敗した場合にメイン スレッドでアクションを実行できます。

于 2012-07-09T19:47:14.820 に答える
1

Worker を実装するCallable<Void>ようにすると、次のように実行できます。

public void process(File directory) throws IOException {
    ExecutorService executorService = new ThreadPoolExecutor(16, 16,
            Long.MAX_VALUE, TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());

    // Convenience class to walk over relevant file types.
    List<Future<Void>> futures = new ArrayList<Future<Void>>();
    Source source = new SourceImpl(directory);
    while (source.hasNext()) {
        File file = source.next();
        futures.add(executorService.submit(new Worker(file)));
    }

    try {
        for (Future<Void> future : futures) {
            future.get();
        }
    } catch (ExecutionException e) {
        throw new IOException("Worker thread had a problem!", e.getCause());
    } catch (InterruptedException e) {
        throw new IOException("Worker thread had a problem!", e);
    } finally {
        executorService.shutdown();
    }
}
于 2012-07-09T20:16:19.103 に答える
1

いつものように、スレッド間の最適な通信はキューです。各ワーカーがその実行がどのように終了したかを説明するメッセージを送信し、生成スレッドがキューから読み取れるようにします。さらに、スポーン スレッドは自分がスポーンしたワーカーの数を知っているため、メッセージをカウントするだけで、プールのシャットダウンに依存せずに、すべてのワーカーがいつ終了したかを知ることができます。

于 2012-07-09T19:32:35.077 に答える