spark2 ストリームを介していくつかのデータを処理し、それらを hdfs に保存しようとしています。ストリームの実行中に、単純な選択でリサイクルサーバーを介して保存されたデータを読み取りたい:
SELECT COUNT(*) FROM stream_table UNION ALL SELECT COUNT(*) FROM thisistable;
しかし、私はこの例外を受けています
エラー: org.apache.spark.SparkException: ステージの失敗によりジョブが中止されました: ステージ 5.0 のタスク 0 が 1 回失敗しました。最近の失敗: ステージ 5.0 でタスク 0.0 が失われました (TID 6、localhost): java.lang.RuntimeException: hdfs ://5b6b8bf723a2:9000/archiveData/parquets/efc44dd4-1792-4b6d-b0f2-120818047b1b は、parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:412) の parquet.hadoop.ParquetFileReader にある (小さすぎる) Parquet ファイルではありません.readFooter(ParquetFileReader.java:385) at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:371) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.getSplit(ParquetRecordReaderWrapper.java:252) ) org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:99) org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:85) で org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java) で:72) org.apache.spark.rdd.HadoopRDD$$anon$1 で (HadoopRDD.scala:246) org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102) で org.apache.spark .rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)scala:38) at org.apache.spark. rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) org.apache.spark.rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala:319)scala:38) org.apache.spark.rdd.MapPartitionsRDD で。compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)(RDD.scala:319) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala) :319)scala:38) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) で org.apache.spark.scheduler.Task.run(Task.scala:85)ShuffleMapTask.scala:47 で) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) で java.lang.Thread.run(Thread.java:745) または $Worker.run(ThreadPoolExecutor.java:617) でjava.lang.Thread.run(Thread. java:745)or$Worker.run(ThreadPoolExecutor.java:617)java.lang.Thread.run(Thread. java:745)or$Worker.run(ThreadPoolExecutor.java:617)
私の仮定では、スパークはバッチの開始時に空の寄木細工のファイルを作成し、バッチの最後にそれを埋めselect
ます.アーカイブされたファイルを介してAを実行していますが、実際のバッチはまだ終了していないため、1つは空です.
簡単なスパーク ストリームの例 (変換遅延のシミュレーションのための Thread.sleep)
spark
.readStream()
.schema(schema)
.json("/tmp")
.filter(x->{
Thread.sleep(1000);
return true;
})
.writeStream()
.format("parquet")
.queryName("thisistable")
.start()
.awaitTermination();
この例外を回避し、thrift サーバーで完成したファイルのみを取得する方法はありますか?