6

Apache Zeppelin ノートブックを使用して spark-csv [1] で CSV ファイルを Spark データ フレームにロードしようとしていますが、値のない数値フィールドをロードすると、その行のパーサーが失敗し、その行がスキップされます。

行がロードされ、データフレームの値が行をロードし、値が NULL に設定されて、集計が値を無視するようになると予想していました。

%dep
z.reset()
z.addRepo("my-nexus").url("<my_local_nexus_repo_that_is_a_proxy_of_public_repos>")
z.load("com.databricks:spark-csv_2.10:1.1.0")


%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("identifier", StringType, true) ::
    StructField("name", StringType, true) ::
    StructField("height", DoubleType, true) :: 
    Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
                        .schema(schema)
                        .option("header", "true")
                        .load("file:///home/spark_user/data.csv")

df.describe("height").show()

データ ファイルの内容は次のとおりです: /home/spark_user/data.csv

identifier,name,height
1,sam,184
2,cath,180
3,santa,     <-- note that there is not height recorded for Santa !

出力は次のとおりです。

+-------+------+
|summary|height|
+-------+------+
|  count|     2|    <- 2 of 3 lines loaded, ie. sam and cath
|   mean| 182.0|
| stddev|   2.0|
|    min| 180.0|
|    max| 184.0|
+-------+------+

zeppelin のログで、サンタの行を解析する際に次のエラーが表示されます。

ERROR [2015-07-21 16:42:09,940] ({Executor task launch worker-45} CsvRelation.scala[apply]:209) - Exception while parsing line: 3,santa,.
        java.lang.NumberFormatException: empty String
        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
        at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
        at java.lang.Double.parseDouble(Double.java:538)
        at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
        at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31)
        at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:42)
        at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:198)
        at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:180)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
        at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

だから、あなたは私にこれまでのところとても良いと言うかもしれません...そしてあなたは正しいでしょう;)

ここで、年齢などの追加の列を追加したいと思います。そのフィールドには常にデータがあります。

identifier,name,height,age
1,sam,184,30
2,cath,180,32
3,santa,,70

ここで、年齢に関するいくつかの統計を丁寧に尋ねます。

%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("identifier", StringType, true) ::
    StructField("name", StringType, true) ::
    StructField("height", DoubleType, true) :: 
    StructField("age", DoubleType, true) :: 
    Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
                        .schema(schema)
                        .option("header", "true")
                        .load("file:///home/spark_user/data2.csv")

df.describe("age").show()

結果

+-------+----+
|summary| age|
+-------+----+
|  count|   2|
|   mean|31.0|
| stddev| 1.0|
|    min|30.0|
|    max|32.0|
+-------+----+

すべて間違っています!サンタの身長がわからないため、行全体が失われ、年齢の計算はサムとキャスのみに基づいて行われますが、サンタの年齢は完全に有効です。

私の質問は、CSV をロードできるようにサンタの身長をプラグインするために必要な値は何かということです。スキーマをすべて StringType に設定しようとしましたが、その後

次の質問は、

API で、spark を使用して N/A 値を処理できることがわかりました。そのため、すべての列を StringType に設定してデータをロードし、クリーンアップを行ってから、以下に示すようにスキーマのみを適切に設定できるのではないかと考えました。

%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", StringType, true) ::
StructField("age", StringType, true) ::
Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("header", "true").load("file:///home/spark_user/data.csv")

// eg. for each column of my dataframe, replace empty string by null
df.na.replace( "*", Map("" -> null) )

val toDouble = udf[Double, String]( _.toDouble)
df2 = df.withColumn("age", toDouble(df("age")))

df2.describe("age").show()

しかし、 df.na.replace() は例外をスローして停止します:

java.lang.IllegalArgumentException: Unsupported value type java.lang.String ().
        at org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$convertToDouble(DataFrameNaFunctions.scala:417)
        at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
        at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.DataFrameNaFunctions.replace0(DataFrameNaFunctions.scala:337)
        at org.apache.spark.sql.DataFrameNaFunctions.replace(DataFrameNaFunctions.scala:304)

どんな助け、&ヒントも大歓迎です!!

[1] https://github.com/databricks/spark-csv

4

1 に答える 1