Shankar
Shankar

Reputation: 8967

get all the dates between two dates in Spark DataFrame

I have a DF in which I have bookingDt and arrivalDt columns. I need to find all the dates between these two dates.

Sample code:

df = spark.sparkContext.parallelize(
            [Row(vyge_id=1000, bookingDt='2018-01-01', arrivalDt='2018-01-05')]).toDF()
diffDaysDF = df.withColumn("diffDays", datediff('arrivalDt', 'bookingDt'))
diffDaysDF.show()

code output:

+----------+----------+-------+--------+
| arrivalDt| bookingDt|vyge_id|diffDays|
+----------+----------+-------+--------+
|2018-01-05|2018-01-01|   1000|       4|
+----------+----------+-------+--------+

What I tried was finding the number of days between two dates and calculate all the dates using timedelta function and explode it.

dateList = [str(bookingDt + timedelta(i)) for i in range(diffDays)]

Expected output:

Basically, I need to build a DF with a record for each date in between bookingDt and arrivalDt, inclusive.

+----------+----------+-------+----------+
| arrivalDt| bookingDt|vyge_id|txnDt     |
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-01|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-02|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-03|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-04|
+----------+----------+-------+----------+
|2018-01-05|2018-01-01|   1000|2018-01-05|
+----------+----------+-------+----------+

Upvotes: 19

Views: 62948

Answers (4)

werner
werner

Reputation: 14905

For Spark 2.4+ sequence can be used to create an array containg all dates between bookingDt and arrivalDt. This array can then be exploded.

from pyspark.sql import functions as F

df = df \
  .withColumn('bookingDt', F.col('bookingDt').cast('date')) \
  .withColumn('arrivalDt', F.col('arrivalDt').cast('date'))

df.withColumn('txnDt', F.explode(F.expr('sequence(bookingDt, arrivalDt, interval 1 day)')))\
  .show()

Output:

+-------+----------+----------+----------+
|vyge_id| bookingDt| arrivalDt|     txnDt|
+-------+----------+----------+----------+
|   1000|2018-01-01|2018-01-05|2018-01-01|
|   1000|2018-01-01|2018-01-05|2018-01-02|
|   1000|2018-01-01|2018-01-05|2018-01-03|
|   1000|2018-01-01|2018-01-05|2018-01-04|
|   1000|2018-01-01|2018-01-05|2018-01-05|
+-------+----------+----------+----------+

Upvotes: 29

Artem Zaika
Artem Zaika

Reputation: 1221

As @vvg suggested:

# I assume, bookindDt has dates range including arrivalDt, 
# otherwise you have to find intersection of unique dates of bookindDt and arrivalDt

dates_df = df.select('bookindDt').distinct()
dates_df = dates_df.withColumnRenamed('bookindDt', 'day_of_listing')

listing_days_df = df.join(dates_df, on=dates_df.day_of_listing.between(df.bookindDt, df.arrivalDt))

Output:

+----------+----------+-------+-------------------+
| arrivalDt| bookingDt|vyge_id|day_of_listing     |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-01         |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-02         |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-03         |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-04         |
+----------+----------+-------+-------------------+
|2018-01-05|2018-01-01|   1000|2018-01-05         |
+----------+----------+-------+-------------------+

Upvotes: 2

pault
pault

Reputation: 43544

As long as you're using Spark version 2.1 or higher, you can exploit the fact that we can use column values as arguments when using pyspark.sql.functions.expr():

Code:

import pyspark.sql.functions as f

diffDaysDF.withColumn("repeat", f.expr("split(repeat(',', diffDays), ',')"))\
    .select("*", f.posexplode("repeat").alias("txnDt", "val"))\
    .drop("repeat", "val", "diffDays")\
    .withColumn("txnDt", f.expr("date_add(bookingDt, txnDt)"))\
    .show()
#+----------+----------+-------+----------+
#| arrivalDt| bookingDt|vyge_id|     txnDt|
#+----------+----------+-------+----------+
#|2018-01-05|2018-01-01|   1000|2018-01-01|
#|2018-01-05|2018-01-01|   1000|2018-01-02|
#|2018-01-05|2018-01-01|   1000|2018-01-03|
#|2018-01-05|2018-01-01|   1000|2018-01-04|
#|2018-01-05|2018-01-01|   1000|2018-01-05|
#+----------+----------+-------+----------+

Upvotes: 19

vvg
vvg

Reputation: 6395

Well, you can do following.

Create a dataframe with dates only:

dates_df # with all days between first bookingDt and last arrivalDt

and then join those df with between condition:

df.join(dates_df, 
  on=col('dates_df.dates').between(col('df.bookindDt'), col('dt.arrivalDt'))
.select('df.*', 'dates_df.dates')

It might work even faster then solution with explode, however you need to figure out what is start and end date for this df. 10 years df will have just 3650 records not that many to worry about.

Upvotes: 11

Related Questions