1

もちろん、1つのスレッドでファイルから行を読み取ります。行はキーでソートされました。

次に、同じキーを持つ行 (15 ~ 20 行) を収集し、解析、大きな計算などを行い、結果のオブジェクトを統計クラスにプッシュします。

プログラムを並列化して、1 つのスレッドで読み取り、多くのスレッドで解析と計算を行い、結果を 1 つのスレッドに結合して stat クラスに書き込みたいと考えています。

この問題に対する Java7 フレームワークの準備ができているパターンまたは解決策はありますか?

マルチスレッド化、blockingQueue へのプッシュ、別のスレッドでキューを読み取るためのエグゼキュータで実現していますが、私のコードはうまくいかず、バグが発生すると思います

どうもありがとう

更新:

メモリ内のすべてのファイルをマップできません - 非常に大きいです

4

4 に答える 4

2

アプローチの主要なクラスはすでに念頭に置いています。CountDownLatch、Thread.join、エグゼキュータ、フォーク/ジョイン。もう 1 つのオプションは、1 ~ 2 マイクロ秒で測定されるメッセージ パッシング オーバーヘッドを持ち、オープン ソースである Akka フレームワークです。ただし、上記のアプローチよりもパフォーマンスが高く、より単純な別のアプローチを共有させてください。このアプローチは、多くの企業で Java でのバッチ ファイルの読み込みに取り組んだことから生まれました。

仕事を分割するというあなたの目標は、学習ではなくパフォーマンスであると仮定します. 開始から終了までの所要時間で測定されるパフォーマンス。その場合、ファイルをメモリ マッピングし、単一のコアに固定された単一のスレッドで処理するよりも高速にすることは、多くの場合困難です。また、はるかに単純なコードも提供します。二連勝。

これは直感に反するかもしれませんが、ファイルの処理速度は、ほとんどの場合、ファイルの読み込みの効率によって制限されます。処理の並列性ではありません。したがって、ファイルのメモリ マッピングは大きなメリットです。メモリがマップされたら、アルゴリズムがファイルのロードを実行するときに、ハードウェアとの競合が少なくなるようにします。最新のハードウェアでは、IO コントローラとメモリ コントローラが CPU と同じソケットにある傾向があります。これを CPU 自体のプリフェッチャーと組み合わせると、ファイルを単一のスレッドから整然と処理するときに非常に効率的になります。これは非常に極端な場合があるため、並列処理は実際にはかなり遅くなる可能性があります。スレッドをコアに固定すると、通常、メモリにバインドされたアルゴリズムが 5 倍高速化されます。これが、メモリ マッピング部分が非常に重要な理由です。

まだお持ちでない場合は、ぜひお試しください。

于 2013-03-15T13:44:11.087 に答える
0

CountDownLatch http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.htmlを使用できます。

スレッドの開始と参加を同期します。これは、一連のスレッドでループし、各スレッド参照で join() を呼び出すよりも優れています。

于 2013-03-15T12:23:28.483 に答える
0

あなたがしようとしているように仕事を分割するように頼まれた場合、私は何をしますか:

public class App {

    public static class Statistics {
    }

    public static class StatisticsCalculator implements Callable<Statistics> {

        private final List<String> lines;

        public StatisticsCalculator(List<String> lines) {
            this.lines = lines;
        }

        @Override
        public Statistics call() throws Exception {
            //do stuff with lines
            return new Statistics();
        }
    }

    public static void main(String[] args) {
        final File file = new File("path/to/my/file");
        final List<List<String>> partitionedWork = partitionWork(readLines(file), 10);
        final List<Callable<Statistics>> callables = new LinkedList<>();
        for (final List<String> work : partitionedWork) {
            callables.add(new StatisticsCalculator(work));
        }
        final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10));
        final List<Future<Statistics>> futures;
        try {
            futures = executorService.invokeAll(callables);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
        try {
            for (final Future<Statistics> future : futures) {
                final Statistics statistics = future.get();
                //do whatever to aggregate the individual
            }
        } catch (InterruptedException | ExecutionException ex) {
            throw new RuntimeException(ex);
        }
        executorService.shutdown();
        try {
            executorService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    static List<String> readLines(final File file) {
        //read lines
        return new ArrayList<>();
    }

    static List<List<String>> partitionWork(final List<String> lines, final int blockSize) {
        //divide up the incoming list into a number of chunks
        final List<List<String>> partitionedWork = new LinkedList<>();
        for (int i = lines.size(); i > 0; i -= blockSize) {
            int start = i > blockSize ? i - blockSize : 0;
            partitionedWork.add(lines.subList(start, i));
        }
        return partitionedWork;
    }
}

オブジェクトを作成しましたStatistics。これは、行われた作業の結果を保持します。

であるStatisticsCalculatorオブジェクトがありますCallable<Statistics>- これは計算を行います。が与えられList<String>、行を処理して を作成しStatisticsます。

実装する方法をおreadLines任せします。

多くの点で最も重要な方法は方法です。これは、ファイル内のすべての行であるpartitionWork受信を. これは基本的に、各スレッドがどれだけの作業を行うべきかを決定します。パラメーターの調整は非常に重要です。各作業が 1 行のみである場合、オーバーヘッドはおそらく利点を上回りますが、各作業が 1 万行の場合は 1 つの作業しかありません。List<String>List<List<String>>blockSizeblockSizeThread

最後に、操作の要はmain方法です。これは read メソッドを呼び出し、次に partition メソッドを呼び出します。これは、ExecutorService作業のビット数に等しい数のスレッドを生成しますが、最大 10 までです。これを、所有しているコアの数と等しくすることもできます。

mainメソッドは次に、ListすべてのCallableの を、各チャンクに 1 つずつ、に送信しexecutorServiceます。メソッドはinvokeAll、作業が完了するまでブロックします。

このメソッドは、返されたそれぞれをループし、それぞれに対してList<Future>生成されたStatisticsオブジェクトを取得します。集計の準備ができました。

その後、忘れずに をシャットダウンしexecutorServiceてください。アプリケーション フォームが終了できなくなります。

編集

OPは行ごとに読みたいので、ここに改訂がありますmain

 public static void main(String[] args) throws IOException {
    final File file = new File("path/to/my/file");
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final List<Future<Statistics>> futures = new LinkedList<>();
    try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
        List<String> tmp = new LinkedList<>();
        String line = null;
        while ((line = reader.readLine()) != null) {
            tmp.add(line);
            if (tmp.size() == 100) {
                futures.add(executorService.submit(new StatisticsCalculator(tmp)));
                tmp = new LinkedList<>();
            }
        }
        if (!tmp.isEmpty()) {
            futures.add(executorService.submit(new StatisticsCalculator(tmp)));
        }
    }
    try {
        for (final Future<Statistics> future : futures) {
            final Statistics statistics = future.get();
            //do whatever to aggregate the individual
        }
    } catch (InterruptedException | ExecutionException ex) {
        throw new RuntimeException(ex);
    }
    executorService.shutdown();
    try {
        executorService.awaitTermination(1, TimeUnit.DAYS);
    } catch (InterruptedException ex) {
        throw new RuntimeException(ex);
    }
}

これにより、ファイルが 1 行ずつストリーミングされ、指定された数の行が実行された後に新しいタスクが起動され、行がエグゼキューターに処理されます。

インスタンスは返される s による参照であるため、処理が完了clearしたらList<String>inを呼び出す必要があります。それらを使い終わったときに sをクリアすると、メモリのフットプリントが大幅に削減されるはずです。CallableCallableFutureList

さらなる機能強化は、予備のスレッドが存在するまでブロックするここでの提案を使用することです。これにより、s が完了したときにsをクリアすると、一度に 2 行ExecutorService以上のメモリが存在しないことが保証されます。threads*blocksizeListCallable

于 2013-03-15T12:31:33.017 に答える