6

いくつかの ETL を使用して、ビッグデータ プロジェクトの AWS Glue を評価しています。S3 から CSV ファイルを正しく取得するクローラーを追加しました。最初は、その CSV を JSON に変換し、そのファイルを別の S3 の場所 (同じバケット、別のパス) にドロップするだけです。

AWS が提供するスクリプトを使用しました (ここではカスタム スクリプトは使用しません)。そして、すべての列をマップしました。

ターゲット フォルダは空ですが (ジョブが作成されたばかりです)、ジョブは「ファイルが既に存在します」で失敗します: ここにスナップショット。ジョブを開始する前に 、出力をドロップするふりをした S3 の場所は空でした。ただし、エラーの後に 2 つのファイルが表示されますが、それらは部分的なもののようです: スナップショット

何が起こっているのかについてのアイデアはありますか?

完全なスタックは次のとおりです。

    コンテナー: ip-172-31-49-38.ec2.internal_8041 の container_1513099821372_0007_01_000001
    LogType:stdout
    ログのアップロード時間:Tue Dec 12 19:12:04 +0000 2017
    ログの長さ:8462
    ログの内容:
    トレースバック (最新の呼び出しが最後):
    ファイル「script_2017-12-12-19-11-08.py」の 30 行目
    datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1、connection_type = "s3"、connection_options =
    {
        "パス": "s3://primero-viz/output/tcw_entries"
    }
    、フォーマット = "json"、transformation_ctx = "datasink2")
    ファイル「/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/dynamicframe.py」、行 523、from_options
    ファイル "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py"、175 行目、write_dynamic_frame_from_options 内
    ファイル "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/context.py"、198 行目、write_from_options 内
    ファイル「/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py」、32行目、書き込み中
    ファイル「/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/PyGlue.zip/awsglue/data_sink.py」、28 行目、writeFrame 内
    ファイル "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py"、1133 行目、__call__ 内
    ファイル「/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/pyspark.zip/pyspark/sql/utils.py」、63 行目、デコ
    ファイル "/mnt/yarn/usercache/root/appcache/application_1513099821372_0007/container_1513099821372_0007_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py"、319 行目、get_return_value
    py4j.protocol.Py4JJavaError: o86.pyWriteDynamicFrame の呼び出し中にエラーが発生しました。
    : org.apache.spark.SparkException: ステージの失敗によりジョブが中止されました: ステージ 0.0 のタスク 0 が 4 回失敗しました。最近の失敗: ステージ 0.0 のタスク 0.3 が失われました (TID 3、ip-172-31-63-141.ec2 .internal、executor 1): java.io.IOException: ファイルは既に存在します:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000
    com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create (S3NativeFileSystem.java:604) で
    org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915) で
    org.apache.hadoop.fs.FileSystem.create (FileSystem.java:896) で
    org.apache.hadoop.fs.FileSystem.create (FileSystem.java:793) で
    com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:176) で
    com.amazonaws.services.glue.hadoop.TapeOutputFormat.getRecordWriter (TapeOutputFormat.scala:65) で
    org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119) で
    org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102) で
    org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala:87) で
    org.apache.spark.scheduler.Task.run (Task.scala:99) で
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) で
    java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) で
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) で
    java.lang.Thread.run(Thread.java:748) で

    ドライバースタックトレース:
    org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) で
    org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) で
    org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) で
    scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) で
    scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) で
    org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala:1422) で
    org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) で
    org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) で
    scala.Option.foreach(Option.scala:257)で
    org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed (DAGScheduler.scala:802) で
    org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.scala:1650) で
    org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala:1605) で
    org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala:1594) で
    org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) で
    org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala:628) で
    org.apache.spark.SparkContext.runJob (SparkContext.scala:1918) で
    org.apache.spark.SparkContext.runJob (SparkContext.scala:1931) で
    org.apache.spark.SparkContext.runJob (SparkContext.scala:1951) で
    org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1158) で
    org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) で
    org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) で
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) で
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) で
    org.apache.spark.rdd.RDD.withScope(RDD.scala:362) で
    org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset (PairRDDFunctions.scala:1085) で
    org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1005) で
    org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996) で
    org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996) で
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) で
    org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) で
    org.apache.spark.rdd.RDD.withScope(RDD.scala:362) で
    org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile (PairRDDFunctions.scala:996) で
    com.amazonaws.services.glue.HadoopDataSink$$anonfun$2.apply$mcV$sp(DataSink.scala:192) で
    com.amazonaws.services.glue.HadoopDataSink.writeDynamicFrame(DataSink.scala:202) で
    com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:48) で
    sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) で
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) で
    java.lang.reflect.Method.invoke(Method.java:498) で
    py4j.reflection.MethodInvoker.invoke (MethodInvoker.java:244) で
    py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java:357) で
    py4j.Gateway.invoke (Gateway.java:280) で
    py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) で
    py4j.commands.CallCommand.execute (CallCommand.java:79) で
    py4j.GatewayConnection.run (GatewayConnection.java:214) で
    java.lang.Thread.run(Thread.java:748) で
    原因: java.io.IOException: ファイルは既に存在します:s3://primero-viz/output/tcw_entries/run-1513105898742-part-r-00000
4

2 に答える 2