Reputation: 389
I have an input date, but as example as below: df_inp
customer_id |ph_num|date |
1 |123 |2020-10-01|
2 |456 |2020-10-01|
3 |789 |2020-10-01|
1 |654 |2020-10-02|
2 |543 |2020-10-03|
1 |908 |2020-10-04|
4 |123 |2020-10-02|
I need to get the latest record for every daily process. So, I tried with windows rank() operation and it works. But, as the input data is coming in millions to optimize the performance can we use any other spark operations to get the latest data based on customer_id and ordering on date value.
window_func = Window.partition_by("customer_id ").orderBy("date")
df = df.withColumn("rank", rank().over(window_func))
df = df.filter(df.rank == "1")
Here, the customer_id - string and date - timestamp
Upvotes: 1
Views: 698
Reputation: 14845
For Spark 3.0+ it might be worth to check if max_by (or min_by if you take rank 1 as in the question) has better performance characteristics than the window
+ filter
approach.
df.groupBy("customer_id").agg(F.expr("max_by(ph_num,date)"), F.max(F.col("date")))
The result is the same as in the question. Comparing the execution plans of both approaches the max_by
way has one transformation (the filter
) less, but both approaches will trigger one exchange.
Upvotes: 1