user2682459
user2682459

Reputation: 1029

Spark SQL Sort order not retained by GroupBy and Aggregation?

I use Spark 2.1.

If I run the following example:

val seq = Seq((123,"2016-01-01","1"),(123,"2016-01-02","2"),(123,"2016-01-03","3"))

val df = seq.toDF("id","date","score")

val dfAgg = df.sort("id","date").groupBy("id").agg(last("score"))

dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show
dfAgg.show

The output of above code is:

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 1|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 2|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 1|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+

The intention was to get the score associated with the latest date for each id:

+---+------------------+
| id|last(score, false)|
+---+------------------+
|123|                 3|
+---+------------------+ 

but this clearly hasn't worked as the result is non-deterministic. Do we have to use window functions to achieve this?

Upvotes: 4

Views: 3322

Answers (2)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41987

You can try with orderBy instead of sort even though the javadoc says they are same

/** * Returns a new Dataset sorted by the given expressions. * This is an alias of the sort function. * * @group typedrel * @since 2.0.0 / @scala.annotation.varargs def orderBy(sortCol: String, sortCols: String): Dataset[T] = sort(sortCol, sortCols : _*)

You can try

val dfAgg = df.orderBy("id","date").groupBy("id").agg(last("score"))

Upvotes: -3

user2682459
user2682459

Reputation: 1029

Looking at the docs for org.apache.spark.sql.catalyst.expressions.aggregate.Last:

/**
 * Returns the last value of `child` for a group of rows. If the last value of `child`
 * is `null`, it returns `null` (respecting nulls). Even if [[Last]] is used on an already
 * sorted column, if we do partial aggregation and final aggregation (when mergeExpression
 * is used) its result will not be deterministic (unless the input table is sorted and has
 * a single partition, and we use a single reducer to do the aggregation.).
 */

shows that unfortunately this is expected behaviour.

So in answer to my question, for now it seems like Window functions, as described SPARK DataFrame: select the first row of each group may be the best way forwards.

Upvotes: 5

Related Questions