3

セーブ Spark SQL テーブルを Parquet ファイルに書き込もうとしています。他の 問題のため、書き込む前にパーティションの数を減らす必要があります。私のコードは

data.coalesce(1000,shuffle=true).saveAsParquetFile("s3n://...")

これはスローします

java.lang.NullPointerException
        at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:927)
        at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:174)
        at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:191)
        at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$4$$anonfun$apply$2.apply(CoalescedRDD.scala:190)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
        at org.apache.spark.rdd.PartitionCoalescer$LocationIterator.<init>(CoalescedRDD.scala:185)
        at org.apache.spark.rdd.PartitionCoalescer.setupGroups(CoalescedRDD.scala:236)
        at org.apache.spark.rdd.PartitionCoalescer.run(CoalescedRDD.scala:337)
        at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:83)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1128)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:318)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
        at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
        at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:22)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
        at $iwC$$iwC$$iwC.<init>(<console>:31)
        at $iwC$$iwC.<init>(<console>:33)
        at $iwC.<init>(<console>:35)
        at <init>(<console>:37)
        at .<init>(<console>:41)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

合体ステップを実行し、使用するコードを変更するshuffle=truerepartition、同じエラーをスローすると、コードは正常に機能します。私はspark-1.1.0を使用しています。

4

0 に答える 0