3

java.util.concurrentパッケージの型を使用して、ディレクトリ内のすべてのファイルの処理を並列化する方法を理解しようとしています。

私はPythonのマルチプロセッシングパッケージに精通しています。これは非常に使いやすいので、理想的には次のようなものを探しています。

public interface FictionalFunctor<T>{
  void handle(T arg);
}

public class FictionalThreadPool {
  public FictionalThreadPool(int threadCount){
    ...
  }
  public <T> FictionalThreadPoolMapResult<T> map(FictionalFunctor<T> functor, List<T> args){
    // Executes the given functor on each and every arg from args in parallel. Returns, when
    // all the parallel branches return.
    // FictionalThreadPoolMapResult allows to abort the whole mapping process, at the least.
  }
}

dir = getDirectoryToProcess();
pool = new FictionalThreadPool(10);   // 10 threads in the pool
pool.map(new FictionalFunctor<File>(){ 
  @Override
  public void handle(File file){
    // process the file
  }
}, dir.listFiles());

のタイプで似たようなことができる気がしますが、java.util.concurrentどこから始めればいいのか全くわかりません。

何か案は?

ありがとう。

編集1

回答で与えられたアドバイスに従って、私は次のようなものを書きました:

public void processAllFiles() throws IOException {
  ExecutorService exec = Executors.newFixedThreadPool(6);
  BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>(5); // Figured we can keep the contents of 6 files simultaneously.
  exec.submit(new MyCoordinator(exec, tasks));
  for (File file : dir.listFiles(getMyFilter()) {
    try {
      tasks.add(new MyTask(file));
    } catch (IOException exc) {
      System.err.println(String.format("Failed to read %s - %s", file.getName(), exc.getMessage()));
    }
  }
}

public class MyTask implements Runnable {
  private final byte[] m_buffer;
  private final String m_name;

  public MyTask(File file) throws IOException {
    m_name = file.getName();
    m_buffer = Files.toByteArray(file);
  }

  @Override
  public void run() {
    // Process the file contents
  }
}

private class MyCoordinator implements Runnable {
  private final ExecutorService m_exec;
  private final BlockingQueue<Runnable> m_tasks;

  public MyCoordinator(ExecutorService exec, BlockingQueue<Runnable> tasks) {
    m_exec = exec;
    m_tasks = tasks;
  }

  @Override
  public void run() {
    while (true) {
      Runnable task = m_tasks.remove();
      m_exec.submit(task);
    }
  }
}

コードが機能すると私が思った方法は次のとおりです。

  1. ファイルは次々に読み取られます。
  2. MyTaskファイルの内容は専用のインスタンスに保存されます。
  3. タスクを保持するための5の容量を持つブロッキングキュー。私は、サーバーが一度に最大6つのファイルの内容を保持できることを期待しています。5つはキューにあり、もう1つの完全に初期化されたタスクはキューに入るのを待っています。
  4. 特別なMyCoordinatorタスクがファイルタスクをキューからフェッチし、それらを同じプールにディスパッチします。

OK、バグがあります-6つ以上のタスクを作成できます。すべてのプールスレッドがビジーであっても、一部は送信されます。後で解決する予定です。

問題は、それがまったく機能しないことです。最初のMyCoordinator削除でスレッドがブロックされます-これは問題ありません。ただし、新しいタスクがキューに入れられたとしても、ブロックが解除されることはありません。誰かが私が間違っていることを教えてもらえますか?

4

3 に答える 3

1

探しているスレッドプールはExecutorServiceクラスです。を使用して、固定サイズのスレッドプールを作成できますnewFixedThreadPool。これにより、プールですべてのキューとワーカーの機能をカプセル化して、生産者/消費者パターンを簡単に実装できます。

ExecutorService exec = Executors.newFixedThreadPool(10);

次に、タイプが実装されているオブジェクトの形式でタスクを送信できますRunnable(またはCallable結果も取得したい場合)。

class ThreadTask implements Runnable {
    public void run() {
       // task code
    }
}

...

exec.submit(new ThreadTask());
// alternatively, using an anonymous type
exec.submit(new Runnable() {
           public void run() {
              // task code
           }
      });

複数のファイルを並行して処理するための大きなアドバイス:ファイルを保持する単一のメカニカルディスクがある場合は、単一のスレッドを使用してファイルを1つずつ読み取り、上記のように各ファイルをスレッドプールタスクに送信することをお勧めします。処理。パフォーマンスが低下するため、実際の読み取りを並行して実行しないでください。

于 2012-07-27T18:17:55.107 に答える
1

ExecuterServiceを使用するよりも簡単な解決策は、独自のプロデューサー-コンシューマースキームを実装することです。タスクを作成してLinkedBlockingQueueまたはArrayBlockingQueueに送信するスレッドを用意し、このキューをチェックしてタスクを取得して実行するワーカースレッドを用意します。ワーカーを強制的に終了させるExitTaskという名前の特別な種類のタスクが必要になる場合があります。したがって、ジョブの最後にn人のワーカーがいる場合は、n個のExitTasksをキューに追加する必要があります。

于 2012-07-27T18:41:11.960 に答える
1

基本的に、@ Tudorが言ったことは、ExecutorServiceを使用しますが、私は彼のコードを拡張したかったので、他の人の投稿を編集するのはいつも奇妙に感じます。ExecutorServiceに送信する内容の概要は次のとおりです。

public class MyFileTask implements Runnable {
   final File fileToProcess;

   public MyFileTask(File file) {
      fileToProcess = file;
   }

   public void run() {
      // your code goes here, e.g.
      handle(fileToProcess);
      // if you prefer, implement Callable instead
   }
}

行き詰まった場合の詳細については、こちらのブログ投稿も参照してください。

ファイルを処理するとIOExceptionが発生することが多いため、RunnableよりもCallable(チェックされた例外をスローできる)の方がいいと思いますが、YMMVです。

于 2012-07-27T19:14:00.153 に答える