1

更新 これは、Databricks Spark CSV リーダーが DataFrame を作成する方法と関係があることがわかりました。以下の例では、Databricks CSV リーダーを使用して人とアドレスの CSV を読み取り、結果の DataFrame を Parquet 形式で HDFS に書き込みます。

コードを変更して DataFrame を作成しました: (people.csv と同様)

JavaRDD<Address> address = context.textFile("/Users/sfelsheim/data/address.csv").map(
            new Function<String, Address>() {
                public Address call(String line) throws Exception {
                    String[] parts = line.split(",");

                    Address addr = new Address();
                    addr.setAddrId(parts[0]);
                    addr.setCity(parts[1]);
                    addr.setState(parts[2]);
                    addr.setZip(parts[3]);

                    return addr;
                }
            });

結果の DataFrame を Parquet 形式で HDFS に書き込むと、結合が期待どおりに機能します。

どちらの場合もまったく同じ CSV を読んでいます。


HDFS 上の 2 つの異なる寄木細工のファイルから作成された 2 つの DataFrame の単純な結合を実行しようとすると、問題が発生します。


[main] INFO org.apache.spark.SparkContext - Spark バージョン 1.4.1 の実行

Hadoop 2.7.0からの HDFS の使用


説明するサンプルを次に示します。

 public void testStrangeness(String[] args) {
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("joinIssue");
    JavaSparkContext context = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(context);

    DataFrame people = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/people.parquet");
    DataFrame address = sqlContext.parquetFile("hdfs://localhost:9000//datalake/sample/address.parquet");

    people.printSchema();
    address.printSchema();

    // yeah, works
    DataFrame cartJoin = address.join(people);
    cartJoin.printSchema();

    // boo, fails 
    DataFrame joined = address.join(people,
            address.col("addrid").equalTo(people.col("addressid")));

    joined.printSchema();
}

人の中身

first,last,addressid 
your,mom,1 
fred,flintstone,2

アドレスの内容

addrid,city,state,zip
1,sometown,wi,4444
2,bedrock,il,1111

people.printSchema(); 

結果は...

root
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- addressid: integer (nullable = true)

address.printSchema();

結果は...

root
 |-- addrid: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)


DataFrame cartJoin = address.join(people);
cartJoin.printSchema();

デカルト結合は正常に機能し、printSchema() の結果は...

root
 |-- addrid: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- addressid: integer (nullable = true)

この結合...

DataFrame joined = address.join(people,
address.col("addrid").equalTo(people.col("addressid")));

次の例外が発生します。

Exception in thread "main" org.apache.spark.sql.AnalysisException: **Cannot resolve column name "addrid" among (addrid, city, state, zip);**
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:159)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:158)
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:558)
    at dw.dataflow.DataflowParser.testStrangeness(DataflowParser.java:36)
    at dw.dataflow.DataflowParser.main(DataflowParser.java:119)

people と address が共通のキー属性 (addressid) を持ち、使用されるように変更してみました。

address.join(people, "addressid");

しかし、同じ結果が得られました。

何か案は??

ありがとう

4

2 に答える 2

2

問題は、CSV ファイルが BOM 付きの UTF-8 形式であったことでした。DataBricks CSV 実装は、BOM 付きの UTF-8 を処理しません。ファイルを BOMなしでUTF-8 に変換すると、すべて正常に動作します。

于 2015-09-10T15:50:53.117 に答える
0

Notepad ++を使用してこれを修正できました。「エンコード」メニューで、「UTF-8 BOMでエンコード」から「UTF-8でエンコード」に切り替えました。

于 2017-03-10T15:48:09.497 に答える