Pedro Daumas
Pedro Daumas

Reputation: 303

Pyspark Getting last datetime for each day

I have a data frame with updated stats of all users, and I would like to make a new data frame with the same information but with only the last updated stats for each day, Using Pyspark.

Example Input:

date                            id  stat1   stat2   stat3
2020-01-13T22:22:10.000+0000    1   173736  3043    2996
2020-01-13T22:43:19.000+0000    1   173775  3042    2996
2020-01-14T22:43:19.000+0000    1   173775  3042    2996
2020-01-15T22:43:19.000+0000    1   173775  3042    2996
2020-01-13T22:22:10.000+0000    2   257624  1500    53
2020-01-13T22:43:19.000+0000    2   257625  1500    65

Expected Output:

date                            id  stat1   stat2   stat3
2020-01-13T22:43:19.000+0000    1   173775  3042    2996
2020-01-14T22:43:19.000+0000    1   173775  3042    2996
2020-01-15T22:43:19.000+0000    1   173775  3042    2996
2020-01-13T22:43:19.000+0000    2   257625  1500    65

Upvotes: 1

Views: 516

Answers (1)

murtihash
murtihash

Reputation: 8410

I would suggest to use window functions. First bring date column to timestamptype and datetype(for getting max time in a date), and then use max over row_number() or last over window. and then filter to get required rows. Try both and see which fits perfectly for your case.

Dataframe:

df.show()
#+----------------------------+---+------+-----+-----+
#|date                        |id |stat1 |stat2|stat3|
#+----------------------------+---+------+-----+-----+
#|2020-01-13T22:22:10.000+0000|1  |173736|3043 |2996 |
#|2020-01-13T22:43:19.000+0000|1  |173775|3042 |2996 |
#|2020-01-14T22:43:19.000+0000|1  |173775|3042 |2996 |
#|2020-01-15T22:43:19.000+0000|1  |173775|3042 |2996 |
#|2020-01-13T22:22:10.000+0000|2  |257624|1500 |53   |
#|2020-01-13T22:43:19.000+0000|2  |257625|1500 |65   |
#+----------------------------+---+------+-----+-----+

Using max:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("id","date1").orderBy("date")
w2=Window().partitionBy("id","date1")
df.withColumn("date", F.to_timestamp("date","yyyy-MM-dd'T'HH:mm:ss"))\
  .withColumn("date1", F.to_date("date"))\
  .withColumn("rownum", F.row_number().over(w))\
  .withColumn("max", F.max("rownum").over(w2))\
  .filter('rownum=max').drop("date1","rownum","max")\
  .orderBy("id","date").show(truncate=False)

#+-------------------+---+------+-----+-----+
#|date               |id |stat1 |stat2|stat3|
#+-------------------+---+------+-----+-----+
#|2020-01-13 22:43:19|1  |173775|3042 |2996 |
#|2020-01-14 22:43:19|1  |173775|3042 |2996 |
#|2020-01-15 22:43:19|1  |173775|3042 |2996 |
#|2020-01-13 22:43:19|2  |257625|1500 |65   |
#+-------------------+---+------+-----+-----+

Using last function over unbounded window:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("id","date1").orderBy("date").rangeBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df.withColumn("date", F.to_timestamp("date","yyyy-MM-dd'T'HH:mm:ss"))\
  .withColumn("date1", F.to_date("date"))\
  .withColumn("rownum", F.last("date").over(w))\
  .filter('rownum=date').drop("date1","rownum").orderBy("id","date").show(truncate=False)

#+-------------------+---+------+-----+-----+
#|date               |id |stat1 |stat2|stat3|
#+-------------------+---+------+-----+-----+
#|2020-01-13 22:43:19|1  |173775|3042 |2996 |
#|2020-01-14 22:43:19|1  |173775|3042 |2996 |
#|2020-01-15 22:43:19|1  |173775|3042 |2996 |
#|2020-01-13 22:43:19|2  |257625|1500 |65   |
#+-------------------+---+------+-----+-----+

Upvotes: 1

Related Questions