Kaharon
Kaharon

Reputation: 395

Filling gaps in time series Spark for different entities

I have a data frame containing daily events related to various entities in time. I want to fill the gaps in those times series.

Here is the aggregate data I have (left), and on the right side, the data I want to have:

+---------+----------+-------+               +---------+----------+-------+
|entity_id|      date|counter|               |entity_id|      date|counter|
+---------+----------+-------+               +---------+----------+-------+
|        3|2020-01-01|      7|               |        3|2020-01-01|      7|
|        1|2020-01-01|     10|               |        1|2020-01-01|     10|
|        2|2020-01-01|      3|               |        2|2020-01-01|      3|
|        2|2020-01-02|      9|               |        2|2020-01-02|      9|
|        1|2020-01-03|     15|               |        1|2020-01-02|      0|
|        2|2020-01-04|      3|               |        3|2020-01-02|      0|
|        1|2020-01-04|     14|               |        1|2020-01-03|     15|
|        2|2020-01-05|      6|               |        2|2020-01-03|      0|
+---------+----------+-------+               |        3|2020-01-03|      0|
                                             |        3|2020-01-04|      0|
                                             |        2|2020-01-04|      3|
                                             |        1|2020-01-04|     14|
                                             |        2|2020-01-05|      6|
                                             |        1|2020-01-05|      0|
                                             |        3|2020-01-05|      0|
                                             +---------+----------+-------+

I have used this stack overflow topic, which was very useful: Filling gaps in timeseries Spark

Here is my code (filter for only one entity), it is in Python but I think the API is the same in Scala:

(
    df
    .withColumn("date", sf.to_date("created_at"))
    .groupBy(
        sf.col("entity_id"),
        sf.col("date")
    )
    .agg(sf.count(sf.lit(1)).alias("counter"))
    .filter(sf.col("entity_id") == 1)
    .select(
        sf.col("date"),
        sf.col("counter")
    )
    .join(
        spark
        .range(
            df # range start
            .filter(sf.col("entity_id") == 1)
            .select(sf.unix_timestamp(sf.min("created_at")).alias("min"))
            .first().min // a * a, # a = 60 * 60 * 24 = seconds in one day
            
            (df # range end
            .filter(sf.col("entity_id") == 1)
            .select(sf.unix_timestamp(sf.max("created_at")).alias("max"))
            .first().max // a + 1) * a,
            
            a # range step, a = 60 * 60 * 24 = seconds in one day
        )
        .select(sf.to_date(sf.from_unixtime("id")).alias("date")),
        ["date"], # column which will be used for the join
        how="right" # type of join
    )
    .withColumn("counter", sf.when(sf.isnull("counter"), 0).otherwise(sf.col("counter")))
    .sort(sf.col("date"))
    .show(200)
)

This work very well, but now I want to avoid the filter and do a range to fill the time series gaps for every entity (entity_id == 2, entity_id == 3, ...). For your information, depending on the entity_id value, the minimum and the maximum of the column date can be different, nevertheless if your help involves the global minimum and maximum of the whole data frame, it is ok for me as well.

If you need any other information, feel free to ask.

edit: add data example I want to have

Upvotes: 3

Views: 1484

Answers (1)

werner
werner

Reputation: 14905

When creating the elements of the date range, I would rather use the Pandas function than the Spark range, as the Spark range function has some shortcomings when dealing with date values. The amount of different dates is usually small. Even when dealing with a time span of multiple years, the number of different dates is so small that it can be easily broadcasted in a join.

#get the minimun and maximun date and collect it to the driver
min_date, max_date = df.select(F.min("date"), F.max("date")).first()

#use Pandas to create all dates and switch back to PySpark DataFrame
from pandas import pandas as pd
timerange = pd.date_range(start=min_date, end=max_date, freq='1d')
all_dates = spark.createDataFrame(timerange.to_frame(),['date'])

#get all combinations of dates and entity_ids
all_dates_and_ids = all_dates.crossJoin(df.select("entity_id").distinct())

#create the final result by doing a left join and filling null values with 0
result = all_dates_and_ids.join(df, on=['date', 'entity_id'], how="left_outer")\
    .fillna({'counter':'0'}) \
    .orderBy(['date', 'entity_id'])

This gives

+-------------------+---------+-------+
|               date|entity_id|counter|
+-------------------+---------+-------+
|2020-01-01 00:00:00|        1|     10|
|2020-01-01 00:00:00|        2|      3|
|2020-01-01 00:00:00|        3|      7|
|2020-01-02 00:00:00|        1|      0|
|2020-01-02 00:00:00|        2|      9|
|2020-01-02 00:00:00|        3|      0|
|2020-01-03 00:00:00|        1|     15|
|2020-01-03 00:00:00|        2|      0|
|2020-01-03 00:00:00|        3|      0|
|2020-01-04 00:00:00|        1|     14|
|2020-01-04 00:00:00|        2|      3|
|2020-01-04 00:00:00|        3|      0|
|2020-01-05 00:00:00|        1|      0|
|2020-01-05 00:00:00|        2|      6|
|2020-01-05 00:00:00|        3|      0|
+-------------------+---------+-------+

Upvotes: 2

Related Questions