3

私はたくさんのファイルを処理するプログラムを持っています。各ファイルに対して2つのことを行う必要があります。最初にファイルの一部を読み取って処理し、次に結果MyFileDataを保存します。最初の部分は並列化できますが、2番目の部分は並列化できません。

CPUはディスクを待機する必要があるため、すべてを順番に実行すると非常に遅くなります。その後、少し動作し、別の要求を発行して、再度待機します...

私は次のことをしました

class MyCallable implements Callable<MyFileData> {
    MyCallable(File file) {
        this.file = file;
    }
    public MyFileData call() {
        return someSlowOperation(file);
    }
    private final File file;
}

for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());

そしてそれは大いに役立ちました。ただし、次の2つを改善したいと思います。

  • sequentialOperation、最初に利用可能な結果を​​処理するのではなく、固定された順序で実行されます。どうすれば変更できますか?

  • 処理するファイルは数千あり、数千のディスク要求を開始するとディスクのゴミ箱につながる可能性があります。使用するExecutors.newFixedThreadPool(10)ことでこの数を制限しましたが、もっと良いものを探しています。理想的には、セルフチューニングである必要があります。これにより、さまざまなコンピューターで最適に動作します(たとえば、RAIDNCQが使用可能な場合に、より多くの要求を発行します)。ハードウェアの構成を知ることに基づくものではないと思いますが、処理速度を測定し、それに基づいて最適化することは、どういうわけか可能であるはずです。何か案が?

4

2 に答える 2

6

シーケンシャルオペレーションは、最初に利用可能な結果を​​処理するのではなく、固定された順序で実行されます。どうすれば変更できますか?

これはまさにCompletionServiceが行うことです。タスクを並行して処理し、送信順序に関係なく、タスクが完了するとそれらを返します。

簡略化された(テストされていない)例:

int NUM_THREADS = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor);

for (File f : files) futures.add(completionService.submit(new MyCallable(f)));

for(int i = 0; i < futures.size(); i++) {
    Future<MyFileData> next = completionService.take();
    sequentialOperation(next.get());
}

処理するファイルは数千あり、数千のディスク要求を開始するとディスクのゴミ箱につながる可能性があります。Executors.newFixedThreadPool(10)を使用して、この数を制限しましたが、もっと良いものを探しています。

私はそれについて100%確信していません。ディスクの数にもよると思いますが、ディスクアクセス部分をあまり多くのスレッドに分割しないでください(ディスクごとに1つのスレッドが賢明でしょう):多くのスレッドが同時に1つのディスクにアクセスする場合、それは読むよりも探すことに多くの時間を費やします。

于 2012-07-20T11:34:02.337 に答える
2

sequentialOperation は、最初に利用可能な結果を​​処理するのではなく、固定された順序で実行されます。どうすれば変更できますか?

前提: 各someSlowOperation(file);呼び出しにかかる時間はさまざまであるため、MyFileDataを受信したらすぐに処理する必要がありますが、別の と同時に処理する必要はありませんsequentialOperation

これは、プロデューサー/コンシューマー キューを設定することで実現できます。

プロデューサーは、callables例で実行するものであり、処理を待っている作業のキューに結果を追加するビットが追加されています。

消費者はsequentialOperation()呼び出しです - それは独自のスレッドで実行され、1 つしかありません。このスレッドが行うのは、キューの先頭を取得して処理し、プログラムが終了するまで繰り返すことだけです。

このようにして、マシン上のすべてのリソースを最大限に活用できます。

いくつかのサンプル コードを含む関連投稿:キューを使用したプロデューサー/コンシューマー スレッド

編集:これまでにやったことがない人にはかなり不透明なので、簡単なサンプルが必要かもしれないと思いました

public class Main {

    private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10);
    private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1);
    private final LinkedBlockingQueue<MyData> queue = new LinkedBlockingQueue();//or some other impl

    abstract class Producer implements Runnable{
        private final File file;
        Producer(File file) {
            this.file = file;
        }

        public void run() {
            MyData result = someLongAssOperation(file);
            queue.offer(result);
        }

        public abstract void someLongAssOperation(File file);
    }

    abstract class Consumer implements Runnable {
        public void run() {
            while (true) {
                sequentialOperation(queue.take());  
            }
        }

        public abstract void sequentialOperation(MyData data);
    } 

    private void start() {
        consumerExecutor.submit(new Consumer(){
            //implement sequentialOperation here
        });

        for (File f : files) {
            producerExecutor.submit(new Producer(file) {
                //implement the someLongAssOperation()
            });
        }

    }

    public static void main(String[] args) {
        new Main().start();     
    } 

}
于 2012-07-20T12:00:09.347 に答える