Spark SQLクエリに使用されるdataRDDとnewPairDataRDDの2つのspark RDDがあります。アプリケーションが初期化されると、dataRDD が初期化されます。指定された 1 つの hbase エンティティ内のすべてのデータが dataRDD に格納されます。
クライアントの sql クエリが来ると、私の APP はすべての新しい更新と newPairDataRDD への挿入を取得します。dataRDD は newPairDataRDD を結合し、spark SQL コンテキストでテーブルとして登録します。
dataRDD に 0 レコード、newPairDataRDD に新たに挿入された 1 レコードも見つかりました。結合には 4 秒かかります。それは遅すぎる
合理的ではないと思います。速くする方法を知っている人はいますか?以下のような簡単なコードに感謝します
// Step1: load all data from hbase to dataRDD when initial, this only run once.
JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD();
dataRDD.cache();
dataRDD.persist(StorageLevel.MEMORY_ONLY());
logger.info(dataRDD.count());
// Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD
JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
// Step3: if count>0 do union and reduce
if(newPairDataRDD.count() > 0) {
JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);
// if data was updated in DB, need to delete the old version from the dataRDD.
dataRDD = unionedRDD.reduceByKey(
new Function2<Row, Row, Row>() {
// @Override
public Row call(Row r1, Row r2) {
return r2;
}
});
}
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);
//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
Spark Web UI から、以下を確認できます。どうやら結合には4が必要なようです
完了したステージ (8)
ステージ ID 説明 提出された期間 タスク: 成功/合計入力 シャッフル読み取り シャッフル書き込み
6 SparkPlan.scala:85+details で収集 2015 年 1 月 4 日 8:17 2 秒 8 月 8 日 156.0 B
SparkSqlQueryForMarsNew.java:389+details での 7 ユニオン 2015 年 1 月 4 日 8:17 4 秒 8 月 8 日 64.0 B 156.0 B