Reputation: 153
I currently have a dataset grouped into hourly increments by a variable "aggregator". There are gaps in this hourly data and what i would ideally like to do is forward fill the rows with the prior row which maps to the variable in column x.
I've seen some solutions to similar problems using PANDAS but ideally i would like to understand how best to approach this with a pyspark UDF.
I'd initially thought about something like the following with PANDAS but also struggled to implement this to just fill ignoring the aggregator as a first pass:
df = df.set_index(keys=[df.timestamp]).resample('1H', fill_method='ffill')
But ideally i'd like to avoid using PANDAS.
In the example below i have two missing rows of hourly data (labeled as MISSING).
| timestamp | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A |
| 2018-12-27T10:00:00Z | A |
| MISSING | MISSING |
| 2018-12-27T12:00:00Z | A |
| 2018-12-27T13:00:00Z | A |
| 2018-12-27T09:00:00Z | B |
| 2018-12-27T10:00:00Z | B |
| 2018-12-27T11:00:00Z | B |
| MISSING | MISSING |
| 2018-12-27T13:00:00Z | B |
| 2018-12-27T14:00:00Z | B |
The expected output here would be the following:
| timestamp | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A |
| 2018-12-27T10:00:00Z | A |
| 2018-12-27T11:00:00Z | A |
| 2018-12-27T12:00:00Z | A |
| 2018-12-27T13:00:00Z | A |
| 2018-12-27T09:00:00Z | B |
| 2018-12-27T10:00:00Z | B |
| 2018-12-27T11:00:00Z | B |
| 2018-12-27T12:00:00Z | B |
| 2018-12-27T13:00:00Z | B |
| 2018-12-27T14:00:00Z | B |
Appreciate the help.
Thanks.
Upvotes: 2
Views: 2141
Reputation: 1932
Here is the solution, to fill the missing hours. using windows, lag and udf. With little modification it can extend to days as well.
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dateutil.relativedelta import relativedelta
def missing_hours(t1, t2):
return [t1 + relativedelta(hours=-x) for x in range(1, t1.hour-t2.hour)]
missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))
df = spark.read.csv('dates.csv',header=True,inferSchema=True)
window = Window.partitionBy("aggregator").orderBy("timestamp")
df_mising = df.withColumn("prev_timestamp",lag(col("timestamp"),1, None).over(window))\
.filter(col("prev_timestamp").isNotNull())\
.withColumn("timestamp", explode(missing_hours_udf(col("timestamp"), col("prev_timestamp"))))\
.drop("prev_timestamp")
df.union(df_mising).orderBy("aggregator","timestamp").show()
which results
+-------------------+----------+
| timestamp|aggregator|
+-------------------+----------+
|2018-12-27 09:00:00| A|
|2018-12-27 10:00:00| A|
|2018-12-27 11:00:00| A|
|2018-12-27 12:00:00| A|
|2018-12-27 13:00:00| A|
|2018-12-27 09:00:00| B|
|2018-12-27 10:00:00| B|
|2018-12-27 11:00:00| B|
|2018-12-27 12:00:00| B|
|2018-12-27 13:00:00| B|
|2018-12-27 14:00:00| B|
+-------------------+----------+
Upvotes: 5