0

Spark 1.1 の使用

私は次のような仕事をしています:

  1. 特定のルートの下にあるフォルダーのリストを読み取り、リストを並列化します
  2. 各フォルダについて、その下のファイルを読み取ります - これらは gzip されたファイルです
  3. ファイルごとにコンテンツを抽出します。これらは行であり、各行は単一のイベントを表し、フィールドはタブ (TSV) で区切られています
  4. すべての行の単一の RDD を作成します。
  5. TSV を json に変換します。

(現在、線は特定のイベント タイプを表しています。4 つのタイプがあります: セッション、リクエスト、レコメンデーション、ユーザー イベント)

  1. セッション イベントのみを除外します。一部のユーザーIDフィールドに従って、それらの1:100のみをサンプリングします。いくつかの出力構造 (イベントの種類/日付/イベントなど) を表すキーを使用して、それらをペアに変換し、FS に書き込みます。
  2. リクエストとユーザー イベントについても同じことを行います

(レコメンデーションの場合、ユーザー ID に従ってサンプリングを行うことはできません (ユーザー ID がそこに存在しないため)。ただし、相互のリクエスト ID フィールドに基づいて、リクエストとレコメンデーションの間に 1 対 1 の関係があることがわかっています。したがって、:)

  1. 個別のリクエスト ID のリストを作成します。このリストをリクエスト ID に基づく推奨リストにキーとして結合し、必要なフィルタリングを実現します。次に、縮小されたリストを FS に出力します。

さて、ここに私の問題があります。これらのことを行うために使用するコードは、小規模で機能します。しかし、比較的大きな入力で実行し、8 コアとそれぞれ 50 GB のメモリを備えた 80 台のマシンのクラスターを使用すると、多くのマシンが使用されていないことがわかります。つまり、1 つのコアしか占有されていません (また、約 20% しか使用されていません)。メモリは、ジョブに構成された 40 GB のうち 16 GB のみです。

どこかで変換がうまく並列化されていないと思いますが、どこで、なぜなのかわかりません。これが私のコードのほとんどです(問題とは無関係だと思われる補助機能の一部を省略しています)

 public static void main(String[] args) {

    BasicConfigurator.configure();

    conf[0] = new Conf("local[4]");
    conf[1] = new Conf("spark://hadoop-m:7077");
    Conf configuration = conf[1];

    if (args.length != 4) {
        log.error("Error in parameters. Syntax: <input path> <output_path> <filter_factor> <locality>\nfilter_factor is what fraction of sessions to process. For example, to process 1/100 of sessions, use 100\nlocality should be set to \"local\" in case running on local environment, and to \"remote\" otherwise.");
        System.exit(-1);
    }

    final String inputPath = args[0];
    final String outputPath = args[1];
    final Integer filterFactor;

    if (args[3].equals("local")) {
        configuration = conf[0];
    }

    log.setLevel(Level.DEBUG);
    Logger.getRootLogger().removeAppender("console");
    final SparkConf conf = new SparkConf().setAppName("phase0").setMaster(configuration.getMaster());
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.kryo.registrator", "com.doit.customer.dataconverter.MyRegistrator");
    final JavaSparkContext sc = new JavaSparkContext(conf);
    if (configuration.getMaster().contains("spark:")) {
        sc.addJar("/home/hadoop/hadoop-install/phase0-1.0-SNAPSHOT-jar-with-dependencies.jar");
    }
    try {
        filterFactor = Integer.parseInt(args[2]);
        // read all folders from root
        Path inputPathObj = new Path(inputPath);
        FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
        FileStatus[] statusArr = fs.globStatus(inputPathObj);
        List<FileStatus> statusList = Arrays.asList(statusArr);

        List<String> pathsStr = convertFileStatusToPath(statusList);

        JavaRDD<String> paths = sc.parallelize(pathsStr);

        // read all files from each folder
        JavaRDD<String> filePaths = paths.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            @Override
            public Iterable<String> call(Iterator<String> pathsIterator) throws Exception {
                List<String> filesPath = new ArrayList<String>();
                if (pathsIterator != null) {
                    while (pathsIterator.hasNext()) {
                        String currFolder = pathsIterator.next();
                        Path currPath = new Path(currFolder);
                        FileSystem fs = FileSystem.get(currPath.toUri(), new Configuration(true));
                        FileStatus[] files = fs.listStatus(currPath);
                        List<FileStatus> filesList = Arrays.asList(files);
                        List<String> filesPathsStr = convertFileStatusToPath(filesList);
                        filesPath.addAll(filesPathsStr);
                    }
                }
                return filesPath;
            }
        });


        // Transform list of files to list of all files' content in lines
        JavaRDD<String> typedData = filePaths.map(new Function<String, List<String>>() {
            @Override
            public List<String> call(String filePath) throws Exception {
                Tuple2<String, List<String>> tuple = null;
                try {
                    String fileType = null;
                    List<String> linesList = new ArrayList<String>();
                    Configuration conf = new Configuration();
                    CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                    Path path = new Path(filePath);
                    fileType = getType(path.getName());

                    // filter non-trc files
                    if (!path.getName().startsWith("1")) {
                        return linesList;
                    }

                    CompressionCodec codec = compressionCodecs.getCodec(path);
                    FileSystem fs = path.getFileSystem(conf);
                    InputStream in = fs.open(path);
                    if (codec != null) {
                        in = codec.createInputStream(in);
                    } else {
                        throw new IOException();
                    }

                    BufferedReader r = new BufferedReader(new InputStreamReader(in, "UTF-8"), BUFFER_SIZE);

                    // This line will not be added to the list ,
                    // which is what we want - filter the header row
                    String line = r.readLine();

                    // Read all lines
                    while ((line = r.readLine()) != null) {
                        try {
                            String sliceKey = getSliceKey(line, fileType);
                            // Adding event type and output slice key as additional fields
                            linesList.add(fileType + "\t" + sliceKey + "\t" + line);
                        } catch(ParseException e) {
                        }
                    }

                    return linesList;
                } catch (Exception e) { // Filtering of files whose reading went wrong
                    log.error("Reading of the file " + filePath + " went wrong: " + e.getMessage());
                    return new ArrayList();
                }
            }
            // flatten to one big list with all the lines
        }).flatMap(new FlatMapFunction<List<String>, String>() {
            @Override
            public Iterable<String> call(List<String> strings) throws Exception {
                return strings;
            }
        });

        // convert tsv to json

        JavaRDD<ObjectNode> jsons = typedData.mapPartitions(new FlatMapFunction<Iterator<String>, ObjectNode>() {
            @Override
            public Iterable<ObjectNode> call(Iterator<String> stringIterator) throws Exception {
                List<ObjectNode> res = new ArrayList<>();
                while(stringIterator.hasNext()) {
                    String currLine = stringIterator.next();
                    Iterator<String> i = Splitter.on("\t").split(currLine).iterator();
                    if (i.hasNext()) {
                        String type = i.next();
                        ObjectNode json = convert(currLine, type, filterFactor);
                        if(json != null) {
                            res.add(json);
                        }
                    }
                }
                return res;
            }
        }).cache();


        createOutputType(jsons, "Session", outputPath, null);
        createOutputType(jsons, "UserEvent", outputPath, null);
        JavaRDD<ObjectNode> requests = createOutputType(jsons, "Request", outputPath, null);


        // Now leave only the set of request ids - to inner join with the recommendations
        JavaPairRDD<String,String> requestsIds = requests.mapToPair(new PairFunction<ObjectNode, String, String>() {
            @Override
            public Tuple2<String, String> call(ObjectNode jsonNodes) throws Exception {
                String id = jsonNodes.get("id").asText();
                return new Tuple2<String, String>(id,id);
            }
        }).distinct();

        createOutputType(jsons,"RecommendationList", outputPath, requestsIds);

    } catch (IOException e) {
        log.error(e);
        System.exit(1);
    } catch (NumberFormatException e) {
        log.error("filter factor is not a valid number!!");
        System.exit(-1);
    }

    sc.stop();

}

private static JavaRDD<ObjectNode> createOutputType(JavaRDD jsonsList, final String type, String outputPath,JavaPairRDD<String,String> joinKeys) {

    outputPath = outputPath + "/" + type;

    JavaRDD events = jsonsList.filter(new Function<ObjectNode, Boolean>() {
        @Override
        public Boolean call(ObjectNode jsonNodes) throws Exception {
            return jsonNodes.get("type").asText().equals(type);
        }
    });


    // This is in case we need to narrow the list to match some other list of ids... Recommendation List, for example... :)
    if(joinKeys != null) {
        JavaPairRDD<String,ObjectNode> keyedEvents = events.mapToPair(new PairFunction<ObjectNode, String, ObjectNode>() {
            @Override
            public Tuple2<String, ObjectNode> call(ObjectNode jsonNodes) throws Exception {
                return new Tuple2<String, ObjectNode>(jsonNodes.get("requestId").asText(),jsonNodes);
            }
        });

        JavaRDD<ObjectNode> joinedEvents = joinKeys.join(keyedEvents).values().map(new Function<Tuple2<String, ObjectNode>, ObjectNode>() {
           @Override
           public ObjectNode call(Tuple2<String, ObjectNode> stringObjectNodeTuple2) throws Exception {
               return stringObjectNodeTuple2._2;
           }
        });
        events = joinedEvents;
    }


    JavaPairRDD<String,Iterable<ObjectNode>> groupedEvents = events.mapToPair(new PairFunction<ObjectNode, String, ObjectNode>() {
        @Override
        public Tuple2<String, ObjectNode> call(ObjectNode jsonNodes) throws Exception {
            return new Tuple2<String, ObjectNode>(jsonNodes.get("sliceKey").asText(),jsonNodes);
        }
    }).groupByKey();
    // Add convert jsons to strings and add "\n" at the end of each

    JavaPairRDD<String, String> groupedStrings = groupedEvents.mapToPair(new PairFunction<Tuple2<String, Iterable<ObjectNode>>, String, String>() {
        @Override
        public Tuple2<String, String> call(Tuple2<String, Iterable<ObjectNode>> content) throws Exception {
            String string = jsonsToString(content._2);
            log.error(string);
            return new Tuple2<>(content._1, string);
        }
    });
    groupedStrings.saveAsHadoopFile(outputPath, String.class, String.class, KeyBasedMultipleTextOutputFormat.class);
    return events;
}

// Notice the special case of if(joinKeys != null) in which I join the recommendations with request ids.

最後に、Spark ジョブを開始するために使用するコマンドは次のとおりです。

spark-submit --class com.doit.customer.dataconverter.Phase0 --driver-cores 8 --total-executor-cores 632 --driver-memory 40g --executor-memory 40G --deploy-mode cluster /home/hadoop/hadoop-install/phase0-1.0-SNAPSHOT-jar-with-dependencies.jar gs://input/2014_07_31* gs://output/2014_07_31 100 remote
4

1 に答える 1

2

初期パーティションは、ルート (sc.parallelize(pathsStr)) 内の一連のフォルダーに基づいています。フローには、パーティションのバランスを著しく崩す可能性のある 2 つのステップがあります。2) 一部のファイルが他のファイルよりも多くの行を持っている場合、各ファイルから TSV 行を読み取る。

ファイルのサイズがほぼ同じで、一部のフォルダーに他のフォルダーよりも多くのファイルがある場合は、ファイル名を収集した後でパーティションのバランスを調整できます。filePaths の初期値を設定したら、次の行を追加してみてください。

filePaths = filePaths.repartition(sc.defaultParallelism());

これにより、収集されたファイル名がバランスの取れたパーティションにシャッフルされます。

一部のファイルが他のファイルよりも大幅に大きいために不均衡が生じている場合は、同様に repartition を呼び出して typedData RDD のバランスを取り直すことができます。

あるいは、filePath のバランスを取り直しても、いくつかのパーティションに収まるやや大きなファイルが多数あるためにパーティションの不均衡が残る場合は、repartition 引数でより大きな数を使用することで、パフォーマンスを少し改善できる可能性があります。コア数の 4 倍のパーティションが得られるように、4 倍します。これにより、通信コストが少し増加しますが、typedData で結果として得られるパーティション サイズのバランスが改善される場合は有利になる可能性があります。

于 2014-11-21T03:58:28.133 に答える