4

Zeppelin ノートブックで Spark を使用していますが、groupByKey() が機能していないようです。

このコード:

df.groupByKey(row => row.getLong(0))
  .mapGroups((key, iterable) => println(key))

このエラーが表示されます (おそらく、コンパイル エラーです。作業中のデータセットがかなり大きいため、すぐに表示されるためです)。

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

ケースクラスを追加して、すべての行をそれにマップしようとしましたが、それでも同じエラーが発生しました

import spark.implicits._

case class DFRow(profileId: Long, jobId: String, state: String)

def getDFRow(row: Row):DFRow = {
    return DFRow(row.getLong(row.fieldIndex("item0")),
                 row.getString(row.fieldIndex("item1")), 
                 row.getString(row.fieldIndex("item2")))
}

df.map(DFRow(_))
  .groupByKey(row => row.getLong(0))
  .mapGroups((key, iterable) => println(key))

私のデータフレームのスキーマは次のとおりです。

root
|-- item0: long (nullable = true)
|-- item1: string (nullable = true)
|-- item2: string (nullable = true)
4

1 に答える 1

6

mapGroups関数を使用しようとしていますが、 for(Long, Iterator[Row]) => Unitがありません(関数を使用しても意味がないというわけではありません)。EncoderUnit

一般Datasetに、SQL DSL に重点を置いていない API の部分 ( DataFrame => DataFrameDataFrame => RelationalGroupedDatasetRelationalGroupedDataset => DataFrameRelationalGroupedDataset => RelationalGroupedDataset) では、出力値に対して暗黙的または明示的なエンコーダーが必要です。

Rowオブジェクトには事前定義されたエンコーダーがないDataset[Row]ため、静的に型指定されたデータのメソッド設計を使用することはあまり意味がありません。経験則として、常に最初に静的に型付けされたバリアントに変換する必要があります。

df.as[(Long, String, String)]

データフレーム行を更新された行にマップしようとしたときのエンコーダ エラーも参照してください。

于 2016-09-15T18:59:01.983 に答える