一部の map-reduce コードを Spark に移行していますが、関数で返す Iterable を構築するときに問題が発生しています。MR コードでは、キーでグループ化された reduce 関数があり、(multipleOutputs を使用して) 値を反復処理し、(複数の出力で、それは重要ではありません) 次のようなコードに書き込みを使用します (単純化):
reduce(Key key, Iterable<Text> values) {
// ... some code
for (Text xml: values) {
multipleOutputs.write(key, val, directory);
}
}
ただし、Spark では、マップを変換し、これを次のシーケンスに変換しました: mapToPair -> groupByKey -> flatMap が推奨されているように... いくつかの本で。
mapToPair は基本的に functionMap を介してキーを追加します。これは、レコードのいくつかの値に基づいて、そのレコードのキーを作成します。キーのカーディナリティが非常に高い場合があります。
JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() {
public Tuple2<Key, String> call(String value) {
//...
return functionMap.call(value);
}
});
rddPairedにRDD.groupByKey () が適用され、RDD を取得して flatMap 関数にフィードします。
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.groupByKey();
グループ化したら、reduceを実行するための flatMap 呼び出し。ここで、操作は変換です:
public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
// some code...
List<String> out = new ArrayList<String>();
if (someConditionOnKey) {
// do a logic
Grouper grouper = new Grouper();
for (String xml : keyValue._2()) {
// group in a separate class
grouper.add(xml);
}
// operation is now performed on the whole group
out.add(operation(grouper));
} else {
for (String xml : keyValue._2()) {
out.add(operation(xml));
}
return out;
}
}
うまく機能します...レコードが多すぎないキーを使用します。実はreduceでelseに値の多いキーが入るとOutOfMemoryで壊れてしまいます。
注:作成したいロジックを説明するために「if」部分を含めましたが、「else」を入力すると失敗が発生します...データが「else」に入ると、通常はさらに多くの値があることを意味するためですデータの性質によるものです。
グループ化されたすべての値を「アウト」リストに保持する必要があるため、キーに何百万ものレコードがある場合、それらをメモリに保持するため、スケールされないことは明らかです。OOM が発生するポイントに到達しました (はい、それはメモリを要求する上記の「操作」を実行するときです-何も与えられません。ただし、非常に高価なメモリ操作ではありません)。
スケーリングするためにこれを回避する方法はありますか? よりスケーラブルな方法で同じ出力に到達するために他のいくつかのディレクティブを使用して動作を複製するか、マージするための値を Spark に渡すことができるようにするか (MR で行っていたように)...