私が遭遇した特定の例外に由来する一般的な質問があります。
spark 1.6 を使用して、dataproc でデータのクエリを実行しています。2 つのログから 1 日分のデータ (~10000 ファイル) を取得し、いくつかの変換を行う必要があります。
ただし、1 日のクエリが成功しなかった後、データに不良データが含まれている可能性があります (または含まれていない可能性があります)。時間 00-09 を試してもエラーは発生しませんでした。10〜19時間試してみましたが、例外が発生しました。1 時間ごとに試してみたところ、10 時間に不正なデータがあることがわかりました。11時と12時は大丈夫だった
基本的に私のコードは次のとおりです。
val imps = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", "true").load("gs://logs.xxxx.com/2016/03/14/xxxxx/imps/2016-03-14-10*").select("C0","C18","C7","C9","C33","C29","C63").registerTempTable("imps")
val conv = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", "true").load("gs://logs.xxxx.com/2016/03/14/xxxxx/conv/2016-03-14-10*").select("C0","C18","C7","C9","C33","C29","C65").registerTempTable("conversions")
val ff = sqlContext.sql("select * from (select * from imps) A inner join (select * from conversions) B on A.C0=B.C0 and A.C7=B.C7 and A.C18=B.C18 ").coalesce(16).write.format("com.databricks.spark.csv").save("gs://xxxx-spark-results/newSparkResults/Plara2.6Mar14_10_1/")
{オーバー - 簡略化}
私が得るエラーは次のとおりです。
org.apache.spark.SparkException: Job aborted due to stage failure: Task 38 in stage 130.0 failed 4 times, most recent failure: Lost task 38.3 in stage 130.0 (TID 88495, plara26-0317-0001-sw-v8oc.c.xxxxx-analytics.internal): java.lang.NumberFormatException: null
at java.lang.Integer.parseInt(Integer.java:542)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:53)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:181)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:162)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
私の質問は - spark-csv を使用して例外処理を実装する方法は? データフレームをRDDに変換してそこで作業することはできますが、もっと良い方法があるはずです.....
誰かが同様の問題を解決しましたか?