NewCode
NewCode

Reputation: 109

How to get Start and End date in PySpark?

I have a Spark dataframe (articleDF1) below, I am trying to add two columns Start and End date using the Date column to the Dataframe and grouping the resulting dataframe by post_evar10. The final Dataframe will have post_evar10, Start Date and End date

 -------+--------------------+
|      Date|         post_evar10|
+----------+--------------------+
|2019-09-02|www:/espanol/recu...|
|2019-09-02|www:/caregiving/h...|
|2019-12-15|www:/health/condi...|
|2019-09-01|www:/caregiving/h...|
|2019-08-31|www:/travel/trave...|
|2020-01-20|www:/home-family/...|

What I have tried:

from pyspark.sql import functions as f
articleDF3 = articleDF1.withColumn('Start_Date', f.min(f.col('Date'))).withColumn('Start_Date', f.max(f.col('Date'))).groupBy(f.col("post_evar10")).drop("Date")

Getting Error: org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'temp.ms_article_lifespan_final.Date' is not an aggregate function. Wrap '(min(temp.ms_article_lifespan_final.Date) AS Start_Date)' in windowing function(s) or wrap 'temp.ms_article_lifespan_final.Date' in first() (or first_value) if you don't care which value you get.;;

Upvotes: 0

Views: 3471

Answers (1)

notNull
notNull

Reputation: 31520

Is this what your expected result?

To get min,max for each row we can use window function and get min,max then group by and in aggregation get the min,max values!

Example:

import sys
from pyspark.sql.window import Window
from pyspark.sql.functions import *

#Sample data
df=sc.parallelize([('2019-09-02','www:/espanol/r'),('2019-09-02','www:/caregiving/h'),('2019-12-15','www:/health/condi')]).toDF(['Date','post_evar10']).withColumn("Date",col("Date").cast("Date"))

#window on all rows
w = Window.orderBy("Date").rowsBetween(-sys.maxsize, sys.maxsize)
#or
w = Window.orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn("min_Date",min("Date").over(w)).\ #get min value for Date
withColumn("max_Date",max("Date").over(w)).\ #get max value for Date
groupBy("post_evar10").\ #groupby on post_evar10
agg(min("min_Date").alias("Start_date"),max("max_Date").alias("End_date")).\ #get min,max
show()

#+-----------------+----------+----------+
#|      post_evar10|Start_date|  End_date|
#+-----------------+----------+----------+
#|   www:/espanol/r|2019-09-02|2019-12-15|
#|www:/caregiving/h|2019-09-02|2019-12-15|
#|www:/health/condi|2019-09-02|2019-12-15|
#+-----------------+----------+----------+

(or)

By using first,last functions over window:

df.withColumn("min_Date",first("Date").over(w)).\
withColumn("max_Date",last("Date").over(w)).\
groupBy("post_evar10").\
agg(min("min_Date").alias("Start_date"),max("max_Date").alias("End_date")).\
show()

Generate min,max for each post_evar10 unique value:

w = Window.partitionBy('post_evar10').orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df=sc.parallelize([('2019-09-02','www:/espanol/r'),('2019-09-02','www:/caregiving/h'),('2019-09-03','www:/caregiving/h'),('2019-12-15','www:/health/condi')]).toDF(['Date','post_evar10']).withColumn("Date",col("Date").cast("Date"))

df.groupBy("post_evar10").\
agg(min("Date").alias("Start_date"),max("Date").alias("End_date")).\
show()

#+-----------------+----------+----------+
#|      post_evar10|Start_date|  End_date|
#+-----------------+----------+----------+
#|www:/health/condi|2019-12-15|2019-12-15|
#|   www:/espanol/r|2019-09-02|2019-09-02|
#|www:/caregiving/h|2019-09-02|2019-09-03|
#+-----------------+----------+----------+

Upvotes: 1

Related Questions