6

Spark SQLクエリに使用されるdataRDDとnewPairDataRDDの2つのspark RDDがあります。アプリケーションが初期化されると、dataRDD が初期化されます。指定された 1 つの hbase エンティティ内のすべてのデータが dataRDD に格納されます。

クライアントの sql クエリが来ると、私の APP はすべての新しい更新と newPairDataRDD への挿入を取得します。dataRDD は newPairDataRDD を結合し、spark SQL コンテキストでテーブルとして登録します。

dataRDD に 0 レコード、newPairDataRDD に新たに挿入された 1 レコードも見つかりました。結合には 4 秒かかります。それは遅すぎる

合理的ではないと思います。速くする方法を知っている人はいますか?以下のような簡単なコードに感謝します

    // Step1: load all data from hbase to dataRDD when initial, this only run once. 
    JavaPairRDD<String, Row>  dataRDD= getAllBaseDataToJavaRDD();
    dataRDD.cache();
    dataRDD.persist(StorageLevel.MEMORY_ONLY());
    logger.info(dataRDD.count());

    // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD

    JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
    // Step3: if count>0 do union and reduce

       if(newPairDataRDD.count() > 0) {

        JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);

    // if data was updated in DB, need to delete the old version from the dataRDD.

        dataRDD = unionedRDD.reduceByKey(
            new Function2<Row, Row, Row>() {
            // @Override
            public Row call(Row r1, Row r2) {
             return r2;
             }
            });
    }
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);

//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

Spark Web UI から、以下を確認できます。どうやら結合には4が必要なようです

完了したステージ (8)

ステージ ID 説明 提出された期間 タスク: 成功/合計入力 シャッフル読み取り シャッフル書き込み

6 SparkPlan.scala:85+details で収集 2015 年 1 月 4 日 8:17 2 秒 8 月 8 日 156.0 B

SparkSqlQueryForMarsNew.java:389+details での 7 ユニオン 2015 年 1 月 4 日 8:17 4 秒 8 月 8 日 64.0 B 156.0 B

4

2 に答える 2

1

目的を達成するためのより効率的な方法は、 acogroup()と aを使用することですflatMapValues()。共用体を使用しても、 に新しいパーティションを追加する以外はほとんど機能しません。dataRDDつまり、すべてのデータを の前にシャッフルする必要がありますreduceByKey()。のみの再分割を引き起こしcogroup()ます。flatMapValues()newPairDataRDD

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD);
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
    new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() {
        public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) {
            if (grouped._2.nonEmpty()) {
                return grouped._2;
            } else {
                return grouped._1;
            }
        }
    });

またはScalaで

val unioned = dataRDD.cogroup(newPairDataRDD)
val updated = unioned.flatMapValues { case (oldVals, newVals) =>
    if (newVals.nonEmpty) newVals else oldVals
}

免責事項、私は Java で spark を書くことに慣れていません! 上記が間違っている場合は、誰かが私を修正してください!

于 2015-11-21T15:15:14.487 に答える