Reputation: 303
I have a data frame with updated stats of all users, and I would like to make a new data frame with the same information but with only the last updated stats for each day, Using Pyspark.
Example Input:
date id stat1 stat2 stat3
2020-01-13T22:22:10.000+0000 1 173736 3043 2996
2020-01-13T22:43:19.000+0000 1 173775 3042 2996
2020-01-14T22:43:19.000+0000 1 173775 3042 2996
2020-01-15T22:43:19.000+0000 1 173775 3042 2996
2020-01-13T22:22:10.000+0000 2 257624 1500 53
2020-01-13T22:43:19.000+0000 2 257625 1500 65
Expected Output:
date id stat1 stat2 stat3
2020-01-13T22:43:19.000+0000 1 173775 3042 2996
2020-01-14T22:43:19.000+0000 1 173775 3042 2996
2020-01-15T22:43:19.000+0000 1 173775 3042 2996
2020-01-13T22:43:19.000+0000 2 257625 1500 65
Upvotes: 1
Views: 516
Reputation: 8410
I would suggest to use window functions
. First bring date column to timestamptype
and datetype
(for getting max time in a date), and then use max over row_number()
or last over window.
and then filter
to get required rows. Try both and see which fits perfectly for your case.
Dataframe:
df.show()
#+----------------------------+---+------+-----+-----+
#|date |id |stat1 |stat2|stat3|
#+----------------------------+---+------+-----+-----+
#|2020-01-13T22:22:10.000+0000|1 |173736|3043 |2996 |
#|2020-01-13T22:43:19.000+0000|1 |173775|3042 |2996 |
#|2020-01-14T22:43:19.000+0000|1 |173775|3042 |2996 |
#|2020-01-15T22:43:19.000+0000|1 |173775|3042 |2996 |
#|2020-01-13T22:22:10.000+0000|2 |257624|1500 |53 |
#|2020-01-13T22:43:19.000+0000|2 |257625|1500 |65 |
#+----------------------------+---+------+-----+-----+
Using max:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id","date1").orderBy("date")
w2=Window().partitionBy("id","date1")
df.withColumn("date", F.to_timestamp("date","yyyy-MM-dd'T'HH:mm:ss"))\
.withColumn("date1", F.to_date("date"))\
.withColumn("rownum", F.row_number().over(w))\
.withColumn("max", F.max("rownum").over(w2))\
.filter('rownum=max').drop("date1","rownum","max")\
.orderBy("id","date").show(truncate=False)
#+-------------------+---+------+-----+-----+
#|date |id |stat1 |stat2|stat3|
#+-------------------+---+------+-----+-----+
#|2020-01-13 22:43:19|1 |173775|3042 |2996 |
#|2020-01-14 22:43:19|1 |173775|3042 |2996 |
#|2020-01-15 22:43:19|1 |173775|3042 |2996 |
#|2020-01-13 22:43:19|2 |257625|1500 |65 |
#+-------------------+---+------+-----+-----+
Using last
function over unbounded window:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id","date1").orderBy("date").rangeBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("date", F.to_timestamp("date","yyyy-MM-dd'T'HH:mm:ss"))\
.withColumn("date1", F.to_date("date"))\
.withColumn("rownum", F.last("date").over(w))\
.filter('rownum=date').drop("date1","rownum").orderBy("id","date").show(truncate=False)
#+-------------------+---+------+-----+-----+
#|date |id |stat1 |stat2|stat3|
#+-------------------+---+------+-----+-----+
#|2020-01-13 22:43:19|1 |173775|3042 |2996 |
#|2020-01-14 22:43:19|1 |173775|3042 |2996 |
#|2020-01-15 22:43:19|1 |173775|3042 |2996 |
#|2020-01-13 22:43:19|2 |257625|1500 |65 |
#+-------------------+---+------+-----+-----+
Upvotes: 1