json ファイルからの情報を使用してカスタム org.apache.spark.sql.types.StructType スキーマ オブジェクトを作成する必要があります。json ファイルは何でもかまいません。そのため、プロパティ ファイル内でパラメトリック化しました。
プロパティファイルは次のようになります。
//ruta al esquema del fichero output (por defecto se infiere el esquema del Parquet destino). Si existe, el esquema será en formato JSON, aplicable a DataFrame (ver StructType.fromJson)
schema.parquet=/Users/XXXX/Desktop/generated_schema.json
writing.mode=overwrite
separator=;
header=false
generated_schema.json ファイルは次のようになります。
{"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}
だから、これは私がそれを解決できると思った方法です:
val path: Path = new Path(mra_schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)
val inputStream: FSDataInputStream = fileSystem.open(path)
val schema_json = Stream.cons(inputStream.readLine(), Stream.continually( inputStream.readLine))
System.out.println("schema_json looks like " + schema_json.head)
val mySchemaStructType :DataType = DataType.fromJson(schema_json.head)
/*
After this line, mySchemaStructType have four StructFields objects inside it, the same than appears at schema_json
*/
logger.info(mySchemaStructType)
val myStructType = new StructType()
myStructType.add("mySchemaStructType",mySchemaStructType)
/*
After this line, myStructType have zero StructFields! here must be the bug, myStructType should have the four StructFields that represents the loaded schema json! this must be the error! but how can i construct the necessary StructType object?
*/
myDF = loadCSV(sqlContext, path_input_csv,separator,myStructType,header)
System.out.println("myDF.schema.json looks like " + myDF.schema.json)
inputStream.close()
df.write
.format("com.databricks.spark.csv")
.option("header", header)
.option("delimiter",delimiter)
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.mode(saveMode)
.parquet(pathParquet)
コードが最後の行 .parquet(pathParquet) を実行すると、例外が発生します。
**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**
このコードの出力は次のようになります。
16/11/11 13:57:04 INFO AnotherCSVtoParquet$: The job started using this propertie file: /Users/aisidoro/Desktop/mra-csv-converter/parametrizacion.properties
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_input_csv is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_output_parquet is /Users/aisidoro/Desktop/output900000
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: mra_schema_parquet is /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: writting_mode is overwrite
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: separator is ;
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: header is false
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: ATTENTION! aplying mra_schema_parquet /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
schema_json looks like {"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}
16/11/11 13:57:12 INFO AnotherCSVtoParquet$: StructType(StructField(codigo,StringType,true), StructField(otro,StringType,true), StructField(vacio,StringType,true), StructField(final,StringType,true))
16/11/11 13:57:13 INFO AnotherCSVtoParquet$: loadCSV. header is false, inferSchema is false pathCSV is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv separator is ;
myDF.schema.json looks like {"type":"struct","fields":[]}
schema_json オブジェクトと myDF.schema.json オブジェクトの内容は同じでなければなりません。しかし、それは起こりませんでした。これでエラーが発生するはずだと思います。
最後に、この例外でジョブがクラッシュします。
**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**
実際には、json スキーマ ファイルを提供しない場合、ジョブは正常に実行されますが、このスキーマでは...
誰でも私を助けることができますか?csv ファイルと json スキーマ ファイルから始めて、いくつかの寄木細工のファイルを作成したいだけです。
ありがとうございました。
依存関係は次のとおりです。
<spark.version>1.5.0-cdh5.5.2</spark.version>
<databricks.version>1.5.0</databricks.version>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>${databricks.version}</version>
</dependency>
アップデート
未解決の問題があることがわかりますが、