0

Hadoop 2.7.2、Centos 7.2 で Apache Spark 2.0 を実行しているクラスターがあります。Spark DataFrame/DataSet API を使用していくつかの新しいコードを作成しましたが、Windows Azure Storage Blob (デフォルトの HDFS の場所) にデータを書き込んでから読み取った後、結合で誤った結果が生じることに気付きました。クラスターで実行されている次のコード スニペットで問題を再現できました。

case class UserDimensions(user: Long, dimension: Long, score: Double)
case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)

val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS

dims.show
cent.show
dims.join(cent, dims("dimension") === cent("dimension") ).show

出力

+-----+---------+-----+                                                         
| user|dimension|score|
+-----+---------+-----+
|12345|        0|  1.0|
+-----+---------+-----+

+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
|        0|      1|  1.0|
|        1|      0|  1.0|
|        2|      2|  1.0|
+---------+-------+-----+

+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345|        0|  1.0|        0|      1|  1.0|
+-----+---------+-----+---------+-------+-----+

どちらが正しい。ただし、データの書き込みと読み取りの後、これが表示されます

dims.write.mode("overwrite").save("/tmp/dims2.parquet")
cent.write.mode("overwrite").save("/tmp/cent2.parquet")

val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]

dims2.show
cent2.show

dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show

出力

+-----+---------+-----+                                                         
| user|dimension|score|
+-----+---------+-----+
|12345|        0|  1.0|
+-----+---------+-----+

+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
|        0|      1|  1.0|
|        1|      0|  1.0|
|        2|      2|  1.0|
+---------+-------+-----+

+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345|        0|  1.0|     null|   null| null|
+-----+---------+-----+---------+-------+-----+

ただし、RDD API を使用すると正しい結果が得られます

dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => (row.dimension, row) ) ).take(5)

res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))

出力形式を寄木細工ではなく ORC に変更しようとしましたが、同じ結果が得られます。クラスタではなくローカルで Spark 2.0 を実行すると、この問題は発生しません。また、Hadoop クラスターのマスター ノードでローカル モードで spark を実行することもできます。YARN の上で実行している場合にのみ、この問題が発生します。

これもこの問題と非常によく似ているようです: https://issues.apache.org/jira/browse/SPARK-10896

4

1 に答える 1