ほとんどの場合に起動する次のコードがありますhiveContext.sql()
。私の仕事は、いくつかのテーブルを作成し、すべてのハイブ テーブル パーティションの処理後に値を挿入することです。
したがって、最初に起動show partitions
し、その出力を for ループで使用して、テーブルを作成し (テーブルが存在しない場合)、 を使用してテーブルに挿入するいくつかのメソッドを呼び出しますhiveContext.sql
。
現在、エグゼキューターで実行することはできないためhiveContext
、ドライバー プログラムの for ループでこれを実行する必要があり、1 つずつシリアルに実行する必要があります。YARN クラスターでこの Spark ジョブを送信すると、ほとんどの場合、シャッフルが見つからないという例外が原因でエグゼキューターが失われます。
これは、メモリの過負荷のために YARN がエグゼキュータを強制終了しているためです。ハイブ パーティションごとに非常に小さなデータ セットがあるため、理由はわかりませんが、それでも YARN がエグゼキュータを強制終了します。
次のコードは、すべてを並行して実行し、すべてのハイブ パーティション データを同時にメモリに格納しようとしますか?
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf();
SparkContext sc = new SparkContext(conf);
HiveContext hc = new HiveContext(sc);
DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")");
Row[] rowArr = partitionFrame.collect();
for(Row row : rowArr) {
String[] splitArr = row.getString(0).split("/");
String server = splitArr[0].split("=")[1];
String date = splitArr[1].split("=")[1];
String csvPath = "hdfs:///user/db/ext/"+server+".csv";
if(fs.exists(new Path(csvPath))) {
hiveContext.sql("ADD FILE " + csvPath);
}
createInsertIntoTableABC(hc,entity, date);
createInsertIntoTableDEF(hc,entity, date);
createInsertIntoTableGHI(hc,entity,date);
createInsertIntoTableJKL(hc,entity, date);
createInsertIntoTableMNO(hc,entity,date);
}
}