何か問題が発生した場合に IOException をスローする必要があるファイルのディレクトリで作業するタスクがあります。また、高速化する必要があるため、完了した作業を複数のスレッドに分割し、それらの終了を待っています。次のようになります。
//Needs to throw IOException so the rest of the framework handles it properly.
public void process(File directory) throws IOException {
ExecutorService executorService =
new ThreadPoolExecutor(16, 16, Long.MAX_VALUE, TimeUnit.NANOSECONDS,
new LinkedBlockingQueue<Runnable>());
//Convenience class to walk over relevant file types.
Source source = new SourceImpl(directory);
while (source.hasNext()) {
File file = source.next();
executorService.execute(new Worker(file));
}
try {
executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
executorService.shutdownNow();
throw new IOException("Worker thread had a problem!");
}
}
ワーカースレッドは基本的に次のとおりです。
private class Worker implements Runnable {
private final File file;
public Worker(File file) { this.file = file; }
@Override
public void run() {
try {
//Do work
} catch (IOException e) {
Thread.currentThread().interrupt();
}
}
}
望ましい動作は、Worker に IOException がある場合、生成スレッドがそれを認識し、独自の IOException をスローできることです。これは、ワーカー スレッドがエラーを通知できるようにするための最善の方法でしたが、正しく設定したかどうかはまだわかりません。
それで、まず第一に、これは私が期待していることをしますか?ワーカー スレッドの run() でエラーが発生した場合、呼び出しThread.currentThread().interrupt();
によって InterruptedException がスローされ、ブロックによってキャッチされるようになりexecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
ますか?
第 2 に、すべてのスレッドがキューに入れられる前に、実行中のワーカーがその割り込みを呼び出した場合はどうなりますか。ブロッキングの try/catch ブロックの前に?
最後に (そして最も重要なことに)、私の目的を達成するためのよりエレガントな方法はありますか? 無数のサブスレッドをすべて、完了するまで、またはいずれかのサブスレッドにエラーが発生するまで実行させたいと考えています。その時点で、(ディレクトリ全体を効果的に失敗させることによって) 生成スレッドで処理したいと考えています。
解決
答えに基づいて、これが私が最終的に使用した実装です。それは私の非同期の欲求をうまく処理し、IOExceptionsできれいにそして比較的速く失敗します。
public void process(File directory) throws IOException {
//Set up a thread pool of 16 to do work.
ExecutorService executorService = Executors.newFixedThreadPool(16);
//Arbitrary file source.
Source source = new SourceImpl(directory);
//List to hold references to all worker threads.
ArrayList<Callable<IOException>> filesToWork =
new ArrayList<Callable<IOException>>();
//Service to manage the running of the threads.
ExecutorCompletionService<IOException> ecs =
new ExecutorCompletionService<IOException>(executorService);
//Queue up all of the file worker threads.
while (source.hasNext())
filesToWork.add(new Worker(file));
//Store the potential results of each worker thread.
int n = filesToWork.size();
ArrayList<Future<IOException>> futures =
new ArrayList<Future<IOException>>(n);
//Prepare to return an arbitrary worker's exception.
IOException exception = null;
try {
//Add all workers to the ECS and Future collection.
for (Callable<IOException> callable : filesToWork)
futures.add(ecs.submit(callable));
for (int i = 0; i < n; i++) {
try {
//Get each result as it's available, sometimes blocking.
IOException e = ecs.take().get();
//Stop if an exception is returned.
if (e != null) {
exception = e;
break;
}
//Also catch our own exceptions.
} catch (InterruptedException e) {
exception = new IOException(e);
break;
} catch (ExecutionException e) {
exception = new IOException(e);
break;
}
}
} finally {
//Stop any pending tasks if we broke early.
for (Future<IOException> f : futures)
f.cancel(true);
//And kill all of the threads.
executorService.shutdownNow();
}
//If anything went wrong, it was preserved. Throw it now.
if (exception != null)
throw exception;
}
と
//Does work, and returns (not throws) an IOException object on error.
private class Worker implements Callable<IOException> {
private final File file;
public Worker(File file) { this.file = file; }
@Override
public IOException call() {
try {
//Do work
} catch (IOException e) {
return e;
}
return null;
}
}