50 台のマシンで Spark クラスターを実行しています。各マシンは 8 コアの VM で、メモリは 50 GB です (41 は Spark で使用できるようです)。
私はいくつかの入力フォルダーで実行しています。入力のサイズは gz 圧縮で ~250GB と見積もっています。
私が使用しているマシンの数と構成は十分であるように思えますが、約 40 分の実行後にジョブが失敗し、ログに次のエラーが表示されます。
2558733 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 345.0 in stage 1.0 (TID 345, hadoop-w-3.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: Java heap space
java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
java.lang.StringCoding.decode(StringCoding.java:193)
java.lang.String.<init>(String.java:416)
java.lang.String.<init>(String.java:481)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
また:
2653545 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 122.1 in stage 1.0 (TID 392, hadoop-w-22.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
java.lang.StringCoding.decode(StringCoding.java:193)
java.lang.String.<init>(String.java:416)
java.lang.String.<init>(String.java:481)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
このような問題をデバッグするにはどうすればよいですか?
編集:問題の根本原因を見つけました。それはこのコードです:
private static final int MAX_FILE_SIZE = 40194304;
....
....
JavaPairRDD<String, List<String>> typedData = filePaths.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, List<String>>() {
@Override
public Iterable<Tuple2<String, List<String>>> call(Iterator<String> filesIterator) throws Exception {
List<Tuple2<String, List<String>>> res = new ArrayList<>();
String fileType = null;
List<String> linesList = null;
if (filesIterator != null) {
while (filesIterator.hasNext()) {
try {
Path file = new Path(filesIterator.next());
// filter non-trc files
if (!file.getName().startsWith("1")) {
continue;
}
fileType = getType(file.getName());
Configuration conf = new Configuration();
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(conf);
ContentSummary contentSummary = fs.getContentSummary(file);
long fileSize = contentSummary.getLength();
InputStream in = fs.open(file);
if (codec != null) {
in = codec.createInputStream(in);
} else {
throw new IOException();
}
byte[] buffer = new byte[MAX_FILE_SIZE];
BufferedInputStream bis = new BufferedInputStream(in, BUFFER_SIZE);
int count = 0;
int bytesRead = 0;
try {
while ((bytesRead = bis.read(buffer, count, BUFFER_SIZE)) != -1) {
count += bytesRead;
}
} catch (Exception e) {
log.error("Error reading file: " + file.getName() + ", trying to read " + BUFFER_SIZE + " bytes at offset: " + count);
throw e;
}
Iterable<String> lines = Splitter.on("\n").split(new String(buffer, "UTF-8").trim());
linesList = Lists.newArrayList(lines);
// get rid of first line in file
Iterator<String> it = linesList.iterator();
if (it.hasNext()) {
it.next();
it.remove();
}
//res.add(new Tuple2<>(fileType,linesList));
} finally {
res.add(new Tuple2<>(fileType, linesList));
}
}
}
return res;
}
特に、BufferedInputStream を使用してファイルのコンテンツを読み取るために、各ファイルにサイズ 40M のバッファーを割り当てます。これにより、スタック メモリがある時点で終了します。
問題は次のとおりです。
- 行ごとに読み取ると(バッファは必要ありません)、非常に非効率的な読み取りになります
- 1 つのバッファーを割り当て、ファイルの読み取りごとに再利用すると、並列処理の意味で可能ですか? それとも、いくつかのスレッドによって上書きされますか?
どんな提案も大歓迎です...
編集 2: バイト配列の割り当てをイテレータの外に移動することで最初のメモリの問題を修正し、すべてのパーティション要素で再利用されるようにしました。ただし、分割の目的で作成される新しい String(buffer, "UTF-8").trim()) がまだあります。これは、毎回作成されるオブジェクトでもあります。stringbuffer/builder を使用できますが、String オブジェクトなしで文字セット エンコーディングを設定するにはどうすればよいでしょうか。