6

コンパイル時の型の安全性により、Spark 2.0 DataSet に非常に満足しています。しかし、ここで解決できない問題がいくつかあります。これに関する適切なドキュメントも見つかりませんでした。

問題 #1 - 集計された列での除算操作 - 以下のコードを検討してください - DataSet[MyCaseClass] があり、c1、c2、c3 および sum(c4) / 8 で groupByKey を使用したいと考えていました。 sum ですが、divide(8) のコンパイル時エラーが発生します。どうすれば次のことを達成できるのだろうか。

final case class MyClass (c1: String,
                          c2: String,
                          c3: String,
                          c4: Double)

    val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded

    import sparkSession.implicits._
    import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}

     myCaseClass.
       groupByKey(myCaseClass =>
          (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
          agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
          divide(8)). //this is breaking with exception
       show()

.divide(8) 操作を削除して上記のコマンドを実行すると、以下の出力が得られます。

+-----------+-------------+
|        key|sum(c4)      |
+-----------+-------------+
| [A1,F2,S1]|         80.0|
| [A1,F1,S1]|         40.0|  
+-----------+-------------+

問題 2 - groupedByKey の結果を別の Typed DataFrame に変換する - 問題 の 2 番目の部分は、再度 Typed DataSet を出力したいということです。そのために、別のケースクラスがあります(必要かどうかはわかりません)が、グループ化された結果でマップする方法がわかりません-

final case class AnotherClass(c1: String,
                          c2: String,
                          c3: String,
                          average: Double) 

 myCaseClass.
           groupByKey(myCaseClass =>
              (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
              agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception

しかし、キーの結果によってグループ化された結果がAnotherClassに直接マップされていないため、これも例外で失敗します。

PS:上記を達成するための他の解決策は大歓迎です。

4

1 に答える 1