5

私は次のデータフレームを持っています:

    |-----id-------|----value------|-----desc------|
    |     1        |     v1        |      d1       |
    |     1        |     v2        |      d2       |
    |     2        |     v21       |      d21      |
    |     2        |     v22       |      d22      |
    |--------------|---------------|---------------|

私はそれを次のように変換したい:

    |-----id-------|----value------|-----desc------|
    |     1        |     v1;v2     |      d1;d2    |
    |     2        |     v21;v22   |      d21;d22  |
    |--------------|---------------|---------------|
  • データフレーム操作で可能ですか?
  • この場合、rdd 変換はどのようになりますか?

rdd.reduce が鍵だと思いますが、このシナリオに適応させる方法がわかりません。

4

4 に答える 4

8

spark sql を使用してデータを変換できます

case class Test(id: Int, value: String, desc: String)
val data = sc.parallelize(Seq((1, "v1", "d1"), (1, "v2", "d2"), (2, "v21", "d21"), (2, "v22", "d22")))
  .map(line => Test(line._1, line._2, line._3))
  .df

data.registerTempTable("data")
val result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id")
result.show
于 2015-12-08T09:52:11.750 に答える
1

次のようなものがあるとします

import scala.util.Random

val sqlc: SQLContext = ???

case class Record(id: Long, value: String, desc: String)

val testData = for {
    (i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5))
  } yield Record(i, s"v$i$j", s"d$i$j")

val df = sqlc.createDataFrame(testData)

次のように簡単にデータを結合できます。

import sqlc.implicits._

def aggConcat(col: String) = df
      .map(row => (row.getAs[Long]("id"), row.getAs[String](col)))
      .aggregateByKey(Vector[String]())(_ :+ _, _ ++ _)

val result = aggConcat("value").zip(aggConcat("desc")).map{
      case ((id, value), (_, desc)) => (id, value, desc)
    }.toDF("id", "values", "descs") 

配列の代わりに文字列を連結したい場合は、後で実行できます

import org.apache.spark.sql.functions._

val resultConcat =  result
      .withColumn("values", concat_ws(";", $"values"))
      .withColumn("descs" , concat_ws(";", $"descs" ))
于 2015-12-08T09:06:27.390 に答える