ImNewToThis
ImNewToThis

Reputation: 153

Forward Fill New Row to Account for Missing Dates

I currently have a dataset grouped into hourly increments by a variable "aggregator". There are gaps in this hourly data and what i would ideally like to do is forward fill the rows with the prior row which maps to the variable in column x.

I've seen some solutions to similar problems using PANDAS but ideally i would like to understand how best to approach this with a pyspark UDF.

I'd initially thought about something like the following with PANDAS but also struggled to implement this to just fill ignoring the aggregator as a first pass:

df = df.set_index(keys=[df.timestamp]).resample('1H', fill_method='ffill')

But ideally i'd like to avoid using PANDAS.

In the example below i have two missing rows of hourly data (labeled as MISSING).

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| MISSING              | MISSING    |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| MISSING              | MISSING    |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

The expected output here would be the following:

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| 2018-12-27T11:00:00Z | A          |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| 2018-12-27T12:00:00Z | B          |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

Appreciate the help.

Thanks.

Upvotes: 2

Views: 2141

Answers (1)

Ranga Vure
Ranga Vure

Reputation: 1932

Here is the solution, to fill the missing hours. using windows, lag and udf. With little modification it can extend to days as well.

from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dateutil.relativedelta import relativedelta

def missing_hours(t1, t2):
    return [t1 + relativedelta(hours=-x) for x in range(1, t1.hour-t2.hour)]

missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))

df = spark.read.csv('dates.csv',header=True,inferSchema=True)

window = Window.partitionBy("aggregator").orderBy("timestamp")

df_mising = df.withColumn("prev_timestamp",lag(col("timestamp"),1, None).over(window))\
       .filter(col("prev_timestamp").isNotNull())\
       .withColumn("timestamp", explode(missing_hours_udf(col("timestamp"), col("prev_timestamp"))))\
       .drop("prev_timestamp")

df.union(df_mising).orderBy("aggregator","timestamp").show()

which results

+-------------------+----------+
|          timestamp|aggregator|
+-------------------+----------+
|2018-12-27 09:00:00|         A|
|2018-12-27 10:00:00|         A|
|2018-12-27 11:00:00|         A|
|2018-12-27 12:00:00|         A|
|2018-12-27 13:00:00|         A|
|2018-12-27 09:00:00|         B|
|2018-12-27 10:00:00|         B|
|2018-12-27 11:00:00|         B|
|2018-12-27 12:00:00|         B|
|2018-12-27 13:00:00|         B|
|2018-12-27 14:00:00|         B|
+-------------------+----------+

Upvotes: 5

Related Questions