Reputation: 1029
I use Spark 2.1.
If I run the following example:
val seq = Seq((123,"2016-01-01","1"),(123,"2016-01-02","2"),(123,"2016-01-03","3"))
val df = seq.toDF("id","date","score")
val dfAgg = df.sort("id","date").groupBy("id").agg(last("score"))
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
The output of above code is:
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 1|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 2|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 1|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
The intention was to get the score associated with the latest date for each id:
+---+------------------+
| id|last(score, false)|
+---+------------------+
|123| 3|
+---+------------------+
but this clearly hasn't worked as the result is non-deterministic. Do we have to use window functions to achieve this?
Upvotes: 4
Views: 3322
Reputation: 41987
You can try with orderBy
instead of sort
even though the javadoc says they are same
/** * Returns a new Dataset sorted by the given expressions. * This is an alias of the
sort
function. * * @group typedrel * @since 2.0.0 / @scala.annotation.varargs def orderBy(sortCol: String, sortCols: String): Dataset[T] = sort(sortCol, sortCols : _*)
You can try
val dfAgg = df.orderBy("id","date").groupBy("id").agg(last("score"))
Upvotes: -3
Reputation: 1029
Looking at the docs for org.apache.spark.sql.catalyst.expressions.aggregate.Last:
/**
* Returns the last value of `child` for a group of rows. If the last value of `child`
* is `null`, it returns `null` (respecting nulls). Even if [[Last]] is used on an already
* sorted column, if we do partial aggregation and final aggregation (when mergeExpression
* is used) its result will not be deterministic (unless the input table is sorted and has
* a single partition, and we use a single reducer to do the aggregation.).
*/
shows that unfortunately this is expected behaviour.
So in answer to my question, for now it seems like Window functions, as described SPARK DataFrame: select the first row of each group may be the best way forwards.
Upvotes: 5