スキーマを提供しながら、csv ファイルを分散データフレーム (ddf) にロードしようとしています。ddf はロードされますが、タイムスタンプ列には null 値しか表示されません。これは、spark が特定の形式のタイムスタンプを想定しているためだと思います。だから私は2つの質問があります:
1) どのようにスパークにフォーマットを与えたり、フォーマットを検出させたりするのですか (のように
"MM/dd/yyyy' 'HH:mm:ss"
)
2) 1 がオプションでない場合 (文字列としてインポートしたと仮定して) フィールドをタイムスタンプに変換する方法。
Q2では、次を使用してみました:
def convert(row :org.apache.spark.sql.Row) :org.apache.spark.sql.Row = {
import org.apache.spark.sql.Row
val format = new java.text.SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
val d1 = getTimestamp(row(3))
return Row(row(0),row(1),row(2),d1);
}
val rdd1 = df.map(convert)
val df1 = sqlContext.createDataFrame(rdd1,schema1)
最後のステップは、終了させない null 値があるため機能しません。次のようなエラーが表示されます。
java.lang.RuntimeException: Failed to check null bit for primitive long value.
ただし、sqlContext.load は問題なく csv をロードできます。
val df = sqlContext.load("com.databricks.spark.csv", schema, Map("path" -> "/path/to/file.csv", "header" -> "true"))