2

私は現在、spark 2.0.1 を使用しており、insertInto() を使用して「パーティション化されたテーブル ハイブ」にデータセットを保存しようとしています。しかし、この 2 つの方法では、データセットの各パーティションは順番に 1 つずつ保存されます。非常に遅いです。insertInto() または partitionBy() を一度に 1 つずつ使用する必要があることは既にわかっています。spark.2.0.1 Dataframe には Resilient Data Set があると思います。私の現在のコード:

df.write.mode(SaveMode.Append).partitionBy("col").save("s3://bucket/diroutput")

または

df.write.mode(SaveMode.Append).insertInto("TableHivealreadypartitioned")

だから私はこのような df.foreachPartition でいくつかのものを試してみてください:

df.foreachPartition{datasetpartition => datasetpartition.foreach(row => row.sometransformation)}

以下に抽出ログがあります。最初の例では、ハイブの「InserInto(tablehivealreadypartitionned)」です。すべての「パーティション」Spark が 1 つずつ書き込まれていることがわかります。2 番目の例では、S3 に直接書き込むのは「partitionBy().save()」です。また、すべての「パーティション」スパークが 1 つずつ書き込まれていることもわかります。私たちが扱うデータフレームには「パーティション」が 1 つしかなく、そのサイズは圧縮されていない (メモリ内で) 約 200MB です。ジョブは、オプション local[4] を使用してデータを保存するのに 120 秒 170 秒かかります。

[INFO] 2016-11-03 00:10:33,255 org.apache.spark.SparkContext logInfo - Created broadcast 2330 from broadcast at TorExitLookup.scala:43
[INFO] 2016-11-03 00:10:35,302 org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:10:35,363 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/db/.hive-staging_hive_2016-11-03_00-10-29_426_1749488585639143697-1/-ext-10000/tsbucket=2016-11-02 09%3A00%3A00/part-00001
[INFO] 2016-11-03 00:10:35,380 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030010_0948_m_000001_0
[INFO] 2016-11-03 00:10:35,380 org.apache.spark.executor.Executor logInfo - Finished task 1.0 in stage 948.0 (TID 1385). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:10:35,381 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 1.0 in stage 948.0 (TID 1385) in 5718 ms on localhost (1/2)
[INFO] 2016-11-03 00:11:23,033 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2330_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:11:58,194 org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:12:00,210 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2329_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:12:05,295 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/db/.hive-staging_hive_2016-11-03_00-10-29_426_1749488585639143697-1/-ext-10000/tsbucket=2016-11-02 09%3A00%3A00/part-00000
[INFO] 2016-11-03 00:12:05,831 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030010_0948_m_000000_0
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.executor.Executor logInfo - Finished task 0.0 in stage 948.0 (TID 1384). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 0.0 in stage 948.0 (TID 1384) in 96173 ms on localhost (2/2)
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.DAGScheduler logInfo - ResultStage 948 (insertInto at ImportHive.scala:24) finished in 96,173 s
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.TaskSchedulerImpl logInfo - Removed TaskSet 948.0, whose tasks have all completed, from pool
[INFO] 2016-11-03 00:12:05,836 org.apache.spark.scheduler.DAGScheduler logInfo - Job 948 finished: insertInto at ImportHive.scala:24, took 96,188035 s


[INFO] 2016-11-03 00:12:17,171 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:12:17,296 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/tsbucket=2016-11-02 09%3A00%3A00/part-r-00001-f433a41e-1b59-49af-b232-cf701e0c6df9.zlib.orc
[INFO] 2016-11-03 00:12:17,388 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030012_0949_m_000001_0
[INFO] 2016-11-03 00:12:17,388 org.apache.spark.executor.Executor logInfo - Finished task 1.0 in stage 949.0 (TID 1387). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:12:17,389 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 1.0 in stage 949.0 (TID 1387) in 6892 ms on localhost (1/2)
[INFO] 2016-11-03 00:12:57,467 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2333_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:13:36,195 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:13:43,689 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/tsbucket=2016-11-02 09%3A00%3A00/part-r-00000-f433a41e-1b59-49af-b232-cf701e0c6df9.zlib.orc
[INFO] 2016-11-03 00:13:44,258 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030012_0949_m_000000_0
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.executor.Executor logInfo - Finished task 0.0 in stage 949.0 (TID 1386). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 0.0 in stage 949.0 (TID 1386) in 93762 ms on localhost (2/2)
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.DAGScheduler logInfo - ResultStage 949 (save at ImportHive.scala:30) finished in 93,762 s
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.TaskSchedulerImpl logInfo - Removed TaskSet 949.0, whose tasks have all completed, from pool
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.DAGScheduler logInfo - Job 949 finished: save at ImportHive.scala:30, took 93,772483 s
[INFO] 2016-11-03 00:13:44,260 org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter cleanupJob - Nothing to clean up since no temporary files were written.
[INFO] 2016-11-03 00:13:44,260 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/_SUCCESS
[INFO] 2016-11-03 00:13:44,275 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Job job_201611030012_0000 committed.

残念ながら、データセットの各スパーク パーティションを並行して書き込み/保存する方法がまだ見つかりません。

誰かがすでにこれを行っていますか?

進め方を教えていただけますか?

向きが悪いのでしょうか?ご協力いただきありがとうございます

4

1 に答える 1

1

私たちが扱うデータフレームには「パーティション」が1つしかなく、そのサイズは非圧縮で約200MBです(メモリ内)

これはあなたの問題です..スパークは、パーティションに基づいてエグゼキューター間で作業を分散します。

並行して作業するには、df に複数のパーティションが必要です。次を使用してこれを行うことができます。

df.repartition(number)

また、次を使用していることを確認してください。

hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2")

s3への書き込み時。

于 2017-01-02T17:31:08.353 に答える