Kishor
Kishor

Reputation: 37

PySpark explode date range into rows

I need to explode date range into multiple rows with new start and end dates so the exploded rows have a range of one day only. I also need a new unique userId and need to retain start times and end times.

Input dataframe

userId Start_Date_Time End_Date_Time
a 2022-12-10 08:00:00 2022-12-15 17:00:00
b 2022-12-06 05:00:00 2022-12-07 18:00:00

Desired Output:

userId userIdNew Start_Date_Time End_Date_Time Start_Date_Time_New End_Date_Time_New
a a1 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-10 08:00:00 2022-12-11 17:00:00
a a2 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-11 08:00:00 2022-12-12 17:00:00
a a3 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-12 08:00:00 2022-12-13 17:00:00
a a4 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-13 08:00:00 2022-12-14 17:00:00
a a5 2022-12-10 08:00:00 2022-12-15 17:00:00 2022-12-14 08:00:00 2022-12-15 17:00:00
b b1 2022-12-06 05:00:00 2022-12-07 18:00:00 2022-12-06 05:00:00 2022-12-07 18:00:00

Upvotes: 0

Views: 1328

Answers (1)

Amir Hossein Shahdaei
Amir Hossein Shahdaei

Reputation: 1256

The 'F.sequence' function will make an array of values between two given columns. because it will include the last value too ([1, 3] -> [1, 2, 3]) you need to reduce endDate by 1 day. after exploding the array you have your start dates and by adding 1 day to it you can have end dates too. for new user id you can use row_number and contacting it with previous id.

from pyspark.sql import Window as W
from pyspark.sql import functions as F
(
    df
    .withColumn('startDate', F.col('startDate').astype('date'))
    .withColumn('endDate', F.col('endDate').astype('date'))
    .withColumn(
        'timeseries', 
        F.sequence(
            F.col('startDate'), 
            F.date_add(F.col('endDate'), -1), 
            F.expr("INTERVAL 1 DAY")))
    .select(
        F.col('id'),
        F.col('startDate'),
        F.col('endDate'),
        F.explode('timeseries').alias('newStartDate'),)
    .withColumn('newEndDate', F.date_add(F.col('newStartDate'), 1))
    .withColumn('rowNumber', F.row_number().over(W.partitionBy('id').orderBy('newStartDate')))
    .withColumn('newId', F.concat('id', 'rowNumber'))
    .drop('rowNumber')
).show()

Upvotes: 1

Related Questions