CopyOfA
CopyOfA

Reputation: 851

Expand PySpark dataframe for missing dates

I have a PySpark dataframe where I have several measures for (key1, key2, key3, date) set. That is:

+-----+-----+-----+----------+-----+-----+
| key1| key2| key3|      date|val_1|val_2|
+-----+-----+-----+----------+-----+-----+
|pk1  |name1|   VA|2022-03-06|    0|    3|
|pk1  |name1|   VA|2022-03-07|    2|    4|
|pk1  |name1|   VA|2022-03-09|    3|    4|
|pk2  |name2|   NC|2022-03-06|    4|    1|
|pk2  |name2|   NC|2022-03-08|    2|    6|
|pk2  |name2|   NC|2022-03-09|    1|    4|
+-----+-----+-----+----------+-----+-----+

This table has about 5600 (key1, key2, key3) unique tuples. I want to fill in the dates to make this a 1-day sequence for all key tuples. The resulting table should be:

+-----+-----+-----+----------+-----+-----+
| key1| key2| key3|      date|val_1|val_2|
+-----+-----+-----+----------+-----+-----+
|pk1  |name1|   VA|2022-03-06|    0|    3|
|pk1  |name1|   VA|2022-03-07|    2|    4|
|pk1  |name1|   VA|2022-03-08|   NA|   NA|
|pk1  |name1|   VA|2022-03-09|    3|    4|
|pk2  |name2|   NC|2022-03-06|    4|    1|
|pk2  |name2|   NC|2022-03-07|   NA|   NA|
|pk2  |name2|   NC|2022-03-08|    2|    6|
|pk2  |name2|   NC|2022-03-09|    1|    4|
+-----+-----+-----+----------+-----+-----+

Here is what I've tried:

from pyspark.sql import functions as F

minDate = df.select(F.min("date")).first()["min(date)"]
maxDate = df.select(F.max("date")).first()["max(date)"]

dateList = ",".join([str(maxDate - datetime.timedelta(days=x)) for x in range((maxDate - minDate).days + 1)])

df = df.select("key1", "key2", "key3", F.explode(F.split(dateList, ",")).alias("date"))

I pulled this solution from this SO answer: How to expand out a Pyspark dataframe based on column?. My plan was to construct this "full" (key1, key2, key3, date) dataframe and then join it with the original dataframe. The error I'm getting is:

You're referencing the column `2022-03-20,2022-03-19,2022-03-18,2022-03-17,2022-03-16,2022-03-15,2022-03-14,2022-03-13,2022-03-12,2022-03-11,2022-03-10,2022-03-09,2022-03-08,2022-03-07,2022-03-06`, but it is missing from the schema.

Upvotes: 1

Views: 1480

Answers (3)

CopyOfA
CopyOfA

Reputation: 851

I already accepted wwnde's answer, but I figured I'd post what I was able to successfully implement before I used their answer.

minDate = df.select(F.min("date")).first()["min(date)"]
maxDate = df.select(F.max("date")).first()["max(date)"]

dateList = [maxDate - datetime.timedelta(days=x) for x in range((maxDate - minDate).days + 1)]

fullDateDf = (
    df
    .select(["key1", "key2", "key3"])
    .dropDuplicates()
    .withColumn("date", F.array([F.lit(x) for x in dateList]))
)

fullDateDf = fullDateDf.select(
    [
        "key1",
        "key2",
        "key3",
        F.explode(F.col("date")).alias("date")
    ]
)

df = (
    df
    .join(
        fullDateDf,
        on=["key1", "key2", "key3", "date"],
        how="outer"
    )
)

I think wwnde's answer is more concise, but I figured I'd share another way of doing this.

Upvotes: 0

wwnde
wwnde

Reputation: 26676

df=df.withColumn('date', to_date('date'))#format date if string
new = (df.groupby('key1','key2','key3').agg(expr('max(date)').alias('max_date'),expr('min(date)').alias('min_date'))#Compute max and min date for use in generating date range
.withColumn('date',expr("explode(sequence(min_date,max_date,interval 1 day))"))#Use sequence to compute range
       .drop('max_date','min_date')#drop unwanted columns
      )
#Join new df back to df
df.join(new, how='right', on=['key1', 'key2', 'key3', 'date']).show()

+----+-----+----+----------+-----+-----+
|key1| key2|key3|      date|val_1|val_2|
+----+-----+----+----------+-----+-----+
| pk1|name1|  VA|2022-03-06|    0|    3|
| pk1|name1|  VA|2022-03-07|    2|    4|
| pk1|name1|  VA|2022-03-08| null| null|
| pk1|name1|  VA|2022-03-09|    3|    4|
| pk2|name2|  NC|2022-03-06|    4|    1|
| pk2|name2|  NC|2022-03-07| null| null|
| pk2|name2|  NC|2022-03-08|    2|    6|
| pk2|name2|  NC|2022-03-09|    1|    4|
+----+-----+----+----------+-----+-----+

Upvotes: 3

annld
annld

Reputation: 69

It should be:

df = df.select("key1", "key2", "key3", F.explode(F.split(F.lit(dateList), ",")).alias("date"))

Use lit() to create a Column of literal value. By the way, you shoud drop duplicates by "key1", "key2", "key3" first.

Upvotes: -1

Related Questions