0

SparseMatrix を列の 1 つとして返すことを使用して集計を実行しmapGroups、列を合計しようとしています。

case class列名を提供するために、マップされた行のスキーマを作成しました。行列の列は と入力されorg.apache.spark.mllib.linalg.Matrixます。toDF集計を実行する前に実行しないと( select(sum("mycolumn"))、タイプの不一致エラーが 1 つ発生します ( required: org.apache.spark.sql.TypedColumn[MySchema,?])。含めるtoDFと、別のタイプの不一致エラーが発生します: cannot resolve 'sum(mycolumn)' due to data type mismatch: function sum requires numeric types, not org.apache.spark.mllib.linalg.MatrixUDT. それで、それを行う正しい方法は何ですか?

4

1 に答える 1

2

ここで、少なくとも 2 つの異なる問題に苦しんでいるようです。あなたがこのようなものを持っていると仮定しましょうDataset

val ds = Seq(
  ("foo",  Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))), 
  ("foo",  Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)))
).toDS

選択TypedColumn:

  • での暗黙的な変換の使用$:

    ds.select(col("_1").as[String])
    
  • 使用o.a.s.sql.functions.col:

    ds.select(col("_1").as[String])
    

行列の追加:

  • MLLibMatrixであり、MatrixUDT加算を実装しません。それはあなたがsum機能したり減らしたりすることができないことを意味します+
  • サードパーティの線形代数ライブラリを使用できますが、これは Spark SQL / Spark Dataset ではサポートされていません

本当にやりたい場合は、Datsets次のようなことを試すことができます。

ds.groupByKey(_._1).mapGroups(
  (key, values) => {
    val matrices = values.map(_._2.toArray)
    val first = matrices.next
    val sum = matrices.foldLeft(first)(
      (acc, m) => acc.zip(m).map { case (x, y) => x + y }
    )
    (key, sum)
})

マトリックスにマップしますが、個人的にはRDDに変換して使用しますbreeze

于 2016-07-21T23:25:36.427 に答える