java.util.concurrent
パッケージの型を使用して、ディレクトリ内のすべてのファイルの処理を並列化する方法を理解しようとしています。
私はPythonのマルチプロセッシングパッケージに精通しています。これは非常に使いやすいので、理想的には次のようなものを探しています。
public interface FictionalFunctor<T>{
void handle(T arg);
}
public class FictionalThreadPool {
public FictionalThreadPool(int threadCount){
...
}
public <T> FictionalThreadPoolMapResult<T> map(FictionalFunctor<T> functor, List<T> args){
// Executes the given functor on each and every arg from args in parallel. Returns, when
// all the parallel branches return.
// FictionalThreadPoolMapResult allows to abort the whole mapping process, at the least.
}
}
dir = getDirectoryToProcess();
pool = new FictionalThreadPool(10); // 10 threads in the pool
pool.map(new FictionalFunctor<File>(){
@Override
public void handle(File file){
// process the file
}
}, dir.listFiles());
のタイプで似たようなことができる気がしますが、java.util.concurrent
どこから始めればいいのか全くわかりません。
何か案は?
ありがとう。
編集1
回答で与えられたアドバイスに従って、私は次のようなものを書きました:
public void processAllFiles() throws IOException {
ExecutorService exec = Executors.newFixedThreadPool(6);
BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>(5); // Figured we can keep the contents of 6 files simultaneously.
exec.submit(new MyCoordinator(exec, tasks));
for (File file : dir.listFiles(getMyFilter()) {
try {
tasks.add(new MyTask(file));
} catch (IOException exc) {
System.err.println(String.format("Failed to read %s - %s", file.getName(), exc.getMessage()));
}
}
}
public class MyTask implements Runnable {
private final byte[] m_buffer;
private final String m_name;
public MyTask(File file) throws IOException {
m_name = file.getName();
m_buffer = Files.toByteArray(file);
}
@Override
public void run() {
// Process the file contents
}
}
private class MyCoordinator implements Runnable {
private final ExecutorService m_exec;
private final BlockingQueue<Runnable> m_tasks;
public MyCoordinator(ExecutorService exec, BlockingQueue<Runnable> tasks) {
m_exec = exec;
m_tasks = tasks;
}
@Override
public void run() {
while (true) {
Runnable task = m_tasks.remove();
m_exec.submit(task);
}
}
}
コードが機能すると私が思った方法は次のとおりです。
- ファイルは次々に読み取られます。
MyTask
ファイルの内容は専用のインスタンスに保存されます。- タスクを保持するための5の容量を持つブロッキングキュー。私は、サーバーが一度に最大6つのファイルの内容を保持できることを期待しています。5つはキューにあり、もう1つの完全に初期化されたタスクはキューに入るのを待っています。
- 特別な
MyCoordinator
タスクがファイルタスクをキューからフェッチし、それらを同じプールにディスパッチします。
OK、バグがあります-6つ以上のタスクを作成できます。すべてのプールスレッドがビジーであっても、一部は送信されます。後で解決する予定です。
問題は、それがまったく機能しないことです。最初のMyCoordinator
削除でスレッドがブロックされます-これは問題ありません。ただし、新しいタスクがキューに入れられたとしても、ブロックが解除されることはありません。誰かが私が間違っていることを教えてもらえますか?