2

パーティション分割されたディレクトリに次の寄木細工のファイルがあります。

/files/dataset
  /id=1
       parquet.gz 
  /id=2
       parquet.gz      
  /id=3
       parquet.gz

spark1.6 では、これらは次のようにアクセスできます。

val arr = sqlContext.read.parquet("/files/dataset/").collect

ただし、spark2.0.1 では、このコードはエラーをスローします。

val arr = spark.read.parquet("/files/dataset/").collect


java.lang.NullPointerException
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:272)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745) 

個々のパーティション ディレクトリは個別に読み取ることができ、統合することもできますが、どのような不一致を探すべきかについて興味があります。

更新: 分割されたディレクトリは、df.where(id=1).write.parquetたとえばdf.write.partitionBy. これが問題の根本にあるようです。ただし、以前のバージョンのスパークで読み取り/収集が成功する理由を積極的に判断しようとしています。

更新: 上記の 'id' 列は Long であり、明示的に書き込むと (例: df.write.parquet('/files/dataset/id=1')) 読み取り中にエラーがスローされます。パーティション検出は明らかにパーティションをLong ではなく IntType https://issues.apache.org/jira/browse/SPARK-18108を参照

4

0 に答える 0