27

私は Databricks の例に取り組んでいます。データフレームのスキーマは次のようになります。

> parquetDF.printSchema
root
|-- department: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- name: string (nullable = true)
|-- employees: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- firstName: string (nullable = true)
|    |    |-- lastName: string (nullable = true)
|    |    |-- email: string (nullable = true)
|    |    |-- salary: integer (nullable = true)

この例では、従業員列を 4 つの追加列に分解する方法を示しています。

val explodeDF = parquetDF.explode($"employees") { 
case Row(employee: Seq[Row]) => employee.map{ employee =>
  val firstName = employee(0).asInstanceOf[String]
  val lastName = employee(1).asInstanceOf[String]
  val email = employee(2).asInstanceOf[String]
  val salary = employee(3).asInstanceOf[Int]
  Employee(firstName, lastName, email, salary)
 }
}.cache()
display(explodeDF)

部門列で同様のことを行うにはどうすればよいですか (つまり、「id」と「name」というデータフレームに 2 つの列を追加します)。メソッドはまったく同じではなく、以下を使用して新しいデータ フレームを作成する方法しかわかりません。

val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)

私が試してみると:

val explodeDF = parquetDF.explode($"department") { 
  case Row(dept: Seq[String]) => dept.map{dept => 
  val id = dept(0) 
  val name = dept(1)
  } 
}.cache()
display(explodeDF)

警告とエラーが表示されます:

<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
            case Row(dept: Seq[String]) => dept.map{dept => 
                           ^
<console>:37: error: inferred type arguments [Unit] do not conform to    method explode's type parameter bounds [A <: Product]
  val explodeDF = parquetDF.explode($"department") { 
                                   ^
4

3 に答える 3

22

次のようなものを使用できます。

var explodeDF = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDeptDF = explodeDeptDF.withColumn("name", explodeDeptDF("department.name"))

あなたが私を助けてくれたものとこれらの質問:

于 2016-09-01T15:54:21.527 に答える