krishna kaushik
krishna kaushik

Reputation: 49

How to create additional rows of missing dateids in PySpark?

I have a Pyspark data frame structured as follows.

| date_id     | anchor |   subs_ids  |
| --------    | ------ | ----------- |
| 2023-10-01  |   123  | ['456',345']|
| 2023-10-01  |   256  | ['278',875']|
| 2023-10-03  |   123  | ['703','409']|
| 2023-10-03  |   256  | ['801','704']|
| 2023-10-06  |   123  | ['398','209']|
| 2023-10-06  |   256  | ['659','783']|

This dataframe has a date ID as a partition, and there are missing days in between (e.g., 2023-10-02, 2023-10-04, and 2023-10-05).

I would like to create additional rows for the missing dates, where each new row contains the respective date ID and the same column values as the latest previous partition.

For example, for the missing date 2023-10-02, I want to create two additional rows with this date ID with the same column values as present in 2023-10-01. Similarly, for 2023-10-04 and 2023-10-05, create two additional rows for each date with the same column values as present in 2023-10-03.

The final desired table, in ascending order of date ID, would be as below.

    | date_id     | anchor |   subs_ids  |
    | --------    | ------ | ----------- |
    | 2023-10-01  |   123  | ['456',345']|
    | 2023-10-01  |   256  | ['278',875']|
    | 2023-10-02  |   123  | ['456',345']|
    | 2023-10-02  |   256  | ['278',875']|
    | 2023-10-03  |   123  | ['703','409']|
    | 2023-10-03  |   256  | ['801','704']|
    | 2023-10-04  |   123  | ['703','409']|
    | 2023-10-04  |   256  | ['801','704']|
    | 2023-10-05  |   123  | ['703','409']|
    | 2023-10-05  |   256  | ['801','704']|
    | 2023-10-06  |   123  | ['398','209']|
    | 2023-10-06  |   256  | ['659','783']|

How can I achieve this in Python, specifically with pyspark functionality?

I've been trying various approaches without success. Any help or guidance would be greatly appreciated. Thank you!

Upvotes: 0

Views: 74

Answers (2)

samkart
samkart

Reputation: 6654

you can create list of dates using sequence function and then explode it.

data_sdf. \
    withColumn('next_dateid', func.lead('dateid').over(wd.partitionBy('anchor').orderBy('dateid'))). \
    withColumn('dtseq', func.expr('sequence(dateid, date_add(next_dateid, -1), interval 1 day)')). \
    selectExpr('explode(coalesce(dtseq, array(dateid))) as dateid', 'anchor', 'subids'). \
    orderBy('dateid', 'anchor'). \
    show(truncate=False)

# +----------+------+----------+
# |dateid    |anchor|subids    |
# +----------+------+----------+
# |2023-10-01|123   |[456, 345]|
# |2023-10-01|256   |[278, 875]|
# |2023-10-02|123   |[456, 345]|
# |2023-10-02|256   |[278, 875]|
# |2023-10-03|123   |[703, 409]|
# |2023-10-03|256   |[801, 704]|
# |2023-10-04|123   |[703, 409]|
# |2023-10-04|256   |[801, 704]|
# |2023-10-05|123   |[703, 409]|
# |2023-10-05|256   |[801, 704]|
# |2023-10-06|123   |[398, 209]|
# |2023-10-06|256   |[659, 783]|
# +----------+------+----------+

Upvotes: 1

Daniel Perez Efremova
Daniel Perez Efremova

Reputation: 710

Just adapting the response from Pyspark: fill missing dates with latest row value to your need

max_date = df.select(F.max('date')).first()['max(date)')
min_date = df.select(F.min('date')).first()['min(date)')

delta = max_date - min_date
dates_list = [(max_date - datetime.timedelta(days=x),) for x in range(delta.days)]

# if there are missing rows
if dates_list:
    # create df with one column 'date'
    dates_df = spark.createDataFrame(dates_list, schema=date_schema)

    # join with original df
    df = df.join(F.broadcast(dates_df), ['date'], 'outer')

    w = Window.orderBy('date').rangeBetween(Window.unboundedPreceding, 0)
    
    # fill all columns with latest non null col value
    for c in df.columns:
        if c != 'date':
            df = df.withColumn(c, F.last(c, ignorenulls=True).over(w))

Upvotes: 0

Related Questions