Hadoop 2.7.2、Centos 7.2 で Apache Spark 2.0 を実行しているクラスターがあります。Spark DataFrame/DataSet API を使用していくつかの新しいコードを作成しましたが、Windows Azure Storage Blob (デフォルトの HDFS の場所) にデータを書き込んでから読み取った後、結合で誤った結果が生じることに気付きました。クラスターで実行されている次のコード スニペットで問題を再現できました。
case class UserDimensions(user: Long, dimension: Long, score: Double)
case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)
val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS
dims.show
cent.show
dims.join(cent, dims("dimension") === cent("dimension") ).show
出力
+-----+---------+-----+
| user|dimension|score|
+-----+---------+-----+
|12345| 0| 1.0|
+-----+---------+-----+
+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
| 0| 1| 1.0|
| 1| 0| 1.0|
| 2| 2| 1.0|
+---------+-------+-----+
+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345| 0| 1.0| 0| 1| 1.0|
+-----+---------+-----+---------+-------+-----+
どちらが正しい。ただし、データの書き込みと読み取りの後、これが表示されます
dims.write.mode("overwrite").save("/tmp/dims2.parquet")
cent.write.mode("overwrite").save("/tmp/cent2.parquet")
val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]
dims2.show
cent2.show
dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show
出力
+-----+---------+-----+
| user|dimension|score|
+-----+---------+-----+
|12345| 0| 1.0|
+-----+---------+-----+
+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
| 0| 1| 1.0|
| 1| 0| 1.0|
| 2| 2| 1.0|
+---------+-------+-----+
+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345| 0| 1.0| null| null| null|
+-----+---------+-----+---------+-------+-----+
ただし、RDD API を使用すると正しい結果が得られます
dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => (row.dimension, row) ) ).take(5)
res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))
出力形式を寄木細工ではなく ORC に変更しようとしましたが、同じ結果が得られます。クラスタではなくローカルで Spark 2.0 を実行すると、この問題は発生しません。また、Hadoop クラスターのマスター ノードでローカル モードで spark を実行することもできます。YARN の上で実行している場合にのみ、この問題が発生します。
これもこの問題と非常によく似ているようです: https://issues.apache.org/jira/browse/SPARK-10896