6

Let's say I have a data structure like this where ts is some timestamp

case class Record(ts: Long, id: Int, value: Int)

Given a large number of these records I want to end up with the record with the highest timestamp for each id. Using the RDD api I think the following code gets the job done:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x, y) => if(x.ts > y.ts) x else y
  }.values
}

Likewise this is my attempt with datasets:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id, records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}

I've being trying to work out how to achieve something similar with dataframes but to no avail- I realise I can do the grouping with:

records.groupBy($"id")

But that gives me a RelationGroupedDataSet and it's not clear to me what aggregation function I need to write to achieve what I want- all example aggregations I've seen appear to focus on returning just a single column being aggregated rather than the whole row.

Is it possible to achieve this using dataframes?

4

2 に答える 2