0

データセット スキーマを作成する方法を理解するのに苦労しています。1 つの列にキーのタプルがあり、2 番目の列に集計がある集計からのデータセットがあります。

> ds.show
+------+------+
|    _1|    _2|
+------+------+
|[96,0]| 93439|
|[69,0]|174386|
|[42,0]| 12427|
|[15,0]|  2090|
|[80,0]|  2626|
|[91,0]| 71963|
|[64,0]|   191|
|[37,0]|    13|
|[48,0]| 13898|
|[21,0]|  2510|
|[59,0]|  1874|
|[32,0]|   373|
| [5,0]|  1075|
|[97,0]|     2|
|[16,0]|   492|
|[11,0]| 34040|
|[76,0]|     4|
|[22,0]|  1216|
|[60,0]|   522|
|[33,0]|   287|
+------+------+
only showing top 20 rows

> ds.schema
StructType(StructField(_1,StructType(StructField(src,IntegerType,false), StructField(dst,IntegerType,false)),true), StructField(_2,LongType,false))

このスキーマを適用できないのはなぜですか?

> val mySchema = StructType(StructField("_1",StructType(StructField("src",IntegerType,false), 
                                                        StructField("dst",IntegerType,false)),true), 
                            StructField("_2",LongType,false))
> ds.as[mySchema]

Name: Compile Error
Message: <console>:41: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
       val returnSchema = StructType(StructField("_1",StructType(StructField("src",IntegerType,false),      
                                                      ^

Scala を反映しようとしても失敗しましたcase class:

> final case class AggSchema(edge: (Int, Int), count: Long)
> ds.as[AggSchema]

Name: org.apache.spark.sql.catalyst.analysis.UnresolvedException
Message: Invalid call to dataType on unresolved object, tree: 'edge
StackTrace: org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:59)
...
4

1 に答える 1

1

スキーマは型ではないため、最初のアプローチはそうではありません。スキーマは、列の Catalyst タイプを記述する単なるオブジェクトです。つまり、スキーマは、に格納されている値を解釈するために必要な単なるメタデータですDataFrame。それがなければ、それDataFrame以上のものDataset[Row]ではなくo.a.s.sql.Row、ほとんどSeq[Any].

フィールドの名前がスキーマと一致しないため、2 番目の方法は機能しません。列の名前以来。

case class Edge(src: Int, dst: Int)

val df = Seq((Edge(96, 0), 93439L)).toDF

名前をまったく使用しないでください。

df.as[((Int, Int), Long)]

または、使用されるケース クラスに一致するスキーマを使用します。たとえば、

case class AggSchema(edge: Edge, count: Long)

df.toDF("edge", "count").as[AggSchema]
于 2016-07-26T22:09:52.947 に答える