更新 これは、Databricks Spark CSV リーダーが DataFrame を作成する方法と関係があることがわかりました。以下の例では、Databricks CSV リーダーを使用して人とアドレスの CSV を読み取り、結果の DataFrame を Parquet 形式で HDFS に書き込みます。
コードを変更して DataFrame を作成しました: (people.csv と同様)
JavaRDD<Address> address = context.textFile("/Users/sfelsheim/data/address.csv").map(
new Function<String, Address>() {
public Address call(String line) throws Exception {
String[] parts = line.split(",");
Address addr = new Address();
addr.setAddrId(parts[0]);
addr.setCity(parts[1]);
addr.setState(parts[2]);
addr.setZip(parts[3]);
return addr;
}
});
結果の DataFrame を Parquet 形式で HDFS に書き込むと、結合が期待どおりに機能します。
どちらの場合もまったく同じ CSV を読んでいます。
HDFS 上の 2 つの異なる寄木細工のファイルから作成された 2 つの DataFrame の単純な結合を実行しようとすると、問題が発生します。
[main] INFO org.apache.spark.SparkContext - Spark バージョン 1.4.1 の実行
Hadoop 2.7.0からの HDFS の使用
説明するサンプルを次に示します。
public void testStrangeness(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("joinIssue");
JavaSparkContext context = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(context);
DataFrame people = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/people.parquet");
DataFrame address = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/address.parquet");
people.printSchema();
address.printSchema();
// yeah, works
DataFrame cartJoin = address.join(people);
cartJoin.printSchema();
// boo, fails
DataFrame joined = address.join(people,
address.col("addrid").equalTo(people.col("addressid")));
joined.printSchema();
}
人の中身
first,last,addressid
your,mom,1
fred,flintstone,2
アドレスの内容
addrid,city,state,zip
1,sometown,wi,4444
2,bedrock,il,1111
people.printSchema();
結果は...
root
|-- first: string (nullable = true)
|-- last: string (nullable = true)
|-- addressid: integer (nullable = true)
address.printSchema();
結果は...
root
|-- addrid: integer (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- zip: integer (nullable = true)
DataFrame cartJoin = address.join(people);
cartJoin.printSchema();
デカルト結合は正常に機能し、printSchema() の結果は...
root
|-- addrid: integer (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- zip: integer (nullable = true)
|-- first: string (nullable = true)
|-- last: string (nullable = true)
|-- addressid: integer (nullable = true)
この結合...
DataFrame joined = address.join(people,
address.col("addrid").equalTo(people.col("addressid")));
次の例外が発生します。
Exception in thread "main" org.apache.spark.sql.AnalysisException: **Cannot resolve column name "addrid" among (addrid, city, state, zip);**
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:558)
at dw.dataflow.DataflowParser.testStrangeness(DataflowParser.java:36)
at dw.dataflow.DataflowParser.main(DataflowParser.java:119)
people と address が共通のキー属性 (addressid) を持ち、使用されるように変更してみました。
address.join(people, "addressid");
しかし、同じ結果が得られました。
何か案は??
ありがとう