私は scala(2.11) と spark (1.6.0) の新しいプログラマーで、spark-csv パッケージなしで RDD を DF に変換しようとしています (練習のためだけでなく、いくつかの技術的な問題のために)。Spark のスターター ガイドと、関連するすべてのスタック オーバーフローの投稿を読んだ後、いくつかのメソッド (4) を機能させる方法がわかりません。
それらのいずれかに関するすべてのヘルプは驚くべきものになります!
txt ファイルのような単純なテーブルがあります。
Jorgito 10 1 Soltero
Juanito 20 2 Casado
Jaimito 30 3 Divociado
私はいくつかの準備をコーディングします:
var RDD_filas = RDD_datos.map(_.split("\t"))
var esquema = new StructType()
.add("Nombre", StringType)
.add("Edad", IntegerType)
.add("Hijos",IntegerType)
.add("EC",StringType)
import org.apache.spark.sql._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
import org.apache.spark.sql.SQLContext
case class X(Nombre: String, Edad: Int, Hijos: Int, EC: String)
そして、私が見たすべての方法を適用しますが、うまくいきません:
var DF_datos = RDD_filas.map({case Array(s0, s1, s2, s3) => X(s0, s1.trim.toInt, s2.trim.toInt, s3)}).toDF("Nombre","Edad","Hijos","EC")
var DF_datos2 = RDD_filas.map(p => X(p(0), p(1).trim.toInt,p(2).trim.toInt,p(3))).toDF("Nombre","Edad","Hijos","EC")
var DF_datos3 = RDD_filas.map(Array(s0, s1, s2, s3) => Array(s0, s1.trim.toInt, s2.trim.toInt, s3)).toDF("Nombre","Edad","Hijos","EC")
var DF_datos4 = sqlContext.createDataFrame(RDD_filas,esquema)
最初の 3 つのメソッドを使用すると、DF を作成してスキーマを出力できますが、ヘッダーがなく (DF_datos.header() は最初の行を返します)、DF_datos.show() を試すとエラーが発生します。 1つ(私にとって)は4番です。これは、最も「標準的な」方法であると想定されているためです。
これだけが私のために働いた:
var a = RDD_datos.map(_.split(" ")).take(3)
val rdd = sc.makeRDD(a)
val df = rdd.map {case Array(s0, s1, s2, s3) => X(s0, s1.toInt, s2.toInt, s3)}.toDF()