あなたがしようとしているように仕事を分割するように頼まれた場合、私は何をしますか:
public class App {
public static class Statistics {
}
public static class StatisticsCalculator implements Callable<Statistics> {
private final List<String> lines;
public StatisticsCalculator(List<String> lines) {
this.lines = lines;
}
@Override
public Statistics call() throws Exception {
//do stuff with lines
return new Statistics();
}
}
public static void main(String[] args) {
final File file = new File("path/to/my/file");
final List<List<String>> partitionedWork = partitionWork(readLines(file), 10);
final List<Callable<Statistics>> callables = new LinkedList<>();
for (final List<String> work : partitionedWork) {
callables.add(new StatisticsCalculator(work));
}
final ExecutorService executorService = Executors.newFixedThreadPool(Math.min(partitionedWork.size(), 10));
final List<Future<Statistics>> futures;
try {
futures = executorService.invokeAll(callables);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
static List<String> readLines(final File file) {
//read lines
return new ArrayList<>();
}
static List<List<String>> partitionWork(final List<String> lines, final int blockSize) {
//divide up the incoming list into a number of chunks
final List<List<String>> partitionedWork = new LinkedList<>();
for (int i = lines.size(); i > 0; i -= blockSize) {
int start = i > blockSize ? i - blockSize : 0;
partitionedWork.add(lines.subList(start, i));
}
return partitionedWork;
}
}
オブジェクトを作成しましたStatistics
。これは、行われた作業の結果を保持します。
であるStatisticsCalculator
オブジェクトがありますCallable<Statistics>
- これは計算を行います。が与えられList<String>
、行を処理して を作成しStatistics
ます。
実装する方法をおreadLines
任せします。
多くの点で最も重要な方法は方法です。これは、ファイル内のすべての行であるpartitionWork
受信を. これは基本的に、各スレッドがどれだけの作業を行うべきかを決定します。パラメーターの調整は非常に重要です。各作業が 1 行のみである場合、オーバーヘッドはおそらく利点を上回りますが、各作業が 1 万行の場合は 1 つの作業しかありません。List<String>
List<List<String>>
blockSize
blockSize
Thread
最後に、操作の要はmain
方法です。これは read メソッドを呼び出し、次に partition メソッドを呼び出します。これは、ExecutorService
作業のビット数に等しい数のスレッドを生成しますが、最大 10 までです。これを、所有しているコアの数と等しくすることもできます。
main
メソッドは次に、List
すべてのCallable
の を、各チャンクに 1 つずつ、に送信しexecutorService
ます。メソッドはinvokeAll
、作業が完了するまでブロックします。
このメソッドは、返されたそれぞれをループし、それぞれに対してList<Future>
生成されたStatistics
オブジェクトを取得します。集計の準備ができました。
その後、忘れずに をシャットダウンしexecutorService
てください。アプリケーション フォームが終了できなくなります。
編集
OPは行ごとに読みたいので、ここに改訂がありますmain
public static void main(String[] args) throws IOException {
final File file = new File("path/to/my/file");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final List<Future<Statistics>> futures = new LinkedList<>();
try (final BufferedReader reader = new BufferedReader(new FileReader(file))) {
List<String> tmp = new LinkedList<>();
String line = null;
while ((line = reader.readLine()) != null) {
tmp.add(line);
if (tmp.size() == 100) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
tmp = new LinkedList<>();
}
}
if (!tmp.isEmpty()) {
futures.add(executorService.submit(new StatisticsCalculator(tmp)));
}
}
try {
for (final Future<Statistics> future : futures) {
final Statistics statistics = future.get();
//do whatever to aggregate the individual
}
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
これにより、ファイルが 1 行ずつストリーミングされ、指定された数の行が実行された後に新しいタスクが起動され、行がエグゼキューターに処理されます。
インスタンスは返される s による参照であるため、処理が完了clear
したらList<String>
inを呼び出す必要があります。それらを使い終わったときに sをクリアすると、メモリのフットプリントが大幅に削減されるはずです。Callable
Callable
Future
List
さらなる機能強化は、予備のスレッドが存在するまでブロックするここでの提案を使用することです。これにより、s が完了したときにsをクリアすると、一度に 2 行ExecutorService
以上のメモリが存在しないことが保証されます。threads*blocksize
List
Callable