Predicate Pushdown を含む Spark を介して後でこれらのファイルを要求する利点を得るために、寄木細工のファイルに並べ替えて書き込みたいデータセットがあります。
現在、列ごとの再分割とパーティションの数を使用して、データを特定のパーティションに移動しました。列は、対応するパーティション (0 から (固定) n まで) を識別します。その結果、scala/spark は予期しない結果を生成し、作成されるパーティションが少なくなります (一部は空です)。たぶんハッシュ衝突?
問題を解決するために、理由を見つけようとし、回避策を見つけようとしました。データフレームをrddに変換し、HashPartitionerでpartitionByを使用することで、1つの回避策を見つけました。驚いたことに、期待通りの結果が得られました。しかし、データフレームを RDD に変換することは、私にとっては解決策ではありません。リソースが多すぎるためです。
この環境をテストしました
cloudera CDH 5.9.3 上の SPARK 2.0
emr-5.17.0 の SPARK 2.3.1
これが出力を使用した私のテストです。Spark-shell を使用して実行してください
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> val mydataindex = Array(0,1, 2, 3,4)
mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)
scala> val mydata = sc.parallelize(for {
| x <- mydataindex
| y <- Array(123,456,789)
| } yield (x, y), 100)
mydata: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:26
scala> val rddMyDataPartitions = rddMyData.mapPartitionsWithIndex{
| (index, iterator) => {
| val myList = iterator.toList
| myList.map(x => x + " -> " + index).iterator
| }
| }
rddMyDataPartitions: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26
scala>
| // this is expected:
scala> rddMyDataPartitions.take(100)
res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) -> 0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2, (2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4, (4,123) -> 4, (4,456) -> 4)
scala> val dfMyData = mydata.toDF()
dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
dfMyDataRepartitioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]
scala> dfMyDataRepartitioned.explain(false)
== Physical Plan ==
Exchange hashpartitioning(_1#3, 5)
+- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
+- Scan ExternalRDDScan[obj#2]
scala> val dfMyDataRepartitionedPartition = dfMyDataRepartitioned.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count()
dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame = [partition_id: int, count: bigint]
scala> // this is unexpected, because 1 partition has more indexes
scala> dfMyDataRepartitionedPartition.show()
+------------+-----+
|partition_id|count|
+------------+-----+
| 1| 6|
| 3| 3|
| 4| 3|
| 2| 3|
+------------+-----+
HashPartitioner がデータフレームを再分割する方法で使用されていることを最初に知りましたが、RDD で動作しているため、そうではないようです。
この「Exchange hashpartitioning」(上記の出力の説明を参照) がどのように機能するかを誰かが教えてくれますか?
2019-01-16 12:20: これはHow does HashPartitioner work?の複製ではありません。整数列の列 (+ パーティション数) による再分割のハッシュ アルゴリズムに興味があるためです。ソース コードからわかるように、一般的な HashPartitioner は期待どおりに機能しています。