warner
warner

Reputation: 155

Fill the data based on the date ranges in spark

I have sample dataset, I want to fill the dates with 0 based on start date and end date (from 2016-01-01 to 2016-01-08).

id,date,quantity
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-06,30
1,2016-01-07,20
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-06,20
2,2016-01-07,20

Based on the solution from below link I was able to implement partial solution. Filling missing dates in spark dataframe column

Can someone please suggest how to fill the dates from start_date to end_date, even for start_date till end_date.

id,date,quantity
1,2016-01-01,0
1,2016-01-02,0
1,2016-01-03,10
1,2016-01-04,20
1,2016-01-05,0
1,2016-01-06,30
1,2016-01-07,20
1,2016-01-08,0
2,2016-01-01,0
2,2016-01-02,10
2,2016-01-03,10
2,2016-01-04,20
2,2016-01-05,0
2,2016-01-06,20
2,2016-01-07,20
2,2016-01-08,0

Upvotes: 2

Views: 2408

Answers (2)

Lamanus
Lamanus

Reputation: 13541

If you want to fill concretely the null values as 0, then fillna is also good.

import pyspark.sql.functions as f
from pyspark.sql import Window

df2 = df.select('id').distinct() \
  .withColumn('date', f.expr('''explode(sequence(date('2016-01-01'), date('2016-01-08'), INTERVAL 1 days)) as date'''))

df2.join(df, ['id', 'date'], 'left').fillna(0).orderBy('id', 'date').show(20, False)

+---+----------+--------+
|id |date      |quantity|
+---+----------+--------+
|1  |2016-01-01|0       |
|1  |2016-01-02|0       |
|1  |2016-01-03|10      |
|1  |2016-01-04|20      |
|1  |2016-01-05|0       |
|1  |2016-01-06|30      |
|1  |2016-01-07|20      |
|1  |2016-01-08|0       |
|2  |2016-01-01|0       |
|2  |2016-01-02|10      |
|2  |2016-01-03|10      |
|2  |2016-01-04|20      |
|2  |2016-01-05|0       |
|2  |2016-01-06|20      |
|2  |2016-01-07|20      |
|2  |2016-01-08|0       |
+---+----------+--------+

Upvotes: 1

notNull
notNull

Reputation: 31460

From Spark-2.4 use sequence function to generate all dates from 2016-01-01 to 2016-01--08.

  • Then join to the original dataframe use coalesce to get quantity and id values.

Example:

df1=sql("select explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").\
withColumn("quantity",lit(0)).\
withColumn("id",lit(1))

df1.show()
#+----------+--------+---+
#|      date|quantity| id|
#+----------+--------+---+
#|2016-01-01|       0|  1|
#|2016-01-02|       0|  1|
#|2016-01-03|       0|  1|
#|2016-01-04|       0|  1|
#|2016-01-05|       0|  1|
#|2016-01-06|       0|  1|
#|2016-01-07|       0|  1|
#|2016-01-08|       0|  1|
#+----------+--------+---+

df.show()
#+---+----------+--------+
#| id|      date|quantity|
#+---+----------+--------+
#|  1|2016-01-03|      10|
#|  1|2016-01-04|      20|
#|  1|2016-01-06|      30|
#|  1|2016-01-07|      20|
#+---+----------+--------+


from pyspark.sql.functions import *
from pyspark.sql.types import *

exprs=['date']+[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns if f not in ['date']]

df1.\
alias("df1").\
join(df.alias("df"),['date'],'left').\
select(*exprs).\
orderBy("date").\
show()

#+----------+--------+---+
#|      date|quantity| id|
#+----------+--------+---+
#|2016-01-01|       0|  1|
#|2016-01-02|       0|  1|
#|2016-01-03|      10|  1|
#|2016-01-04|      20|  1|
#|2016-01-05|       0|  1|
#|2016-01-06|      30|  1|
#|2016-01-07|      20|  1|
#|2016-01-08|       0|  1|
#+----------+--------+---+

Update:

df=spark.createDataFrame([(1,'2016-01-03',10),(1,'2016-01-04',20),(1,'2016-01-06',30),(1,'2016-01-07',20),(2,'2016-01-02',10),(2,'2016-01-03',10),(2,'2016-01-04',20),(2,'2016-01-06',20),(2,'2016-01-07',20)],["id","date","quantity"])

df1=df.selectExpr("id").distinct().selectExpr("id","explode(sequence(date('2016-01-01'),date('2016-01-08'),INTERVAL 1 DAY)) as date").withColumn("quantity",lit(0))

from pyspark.sql.functions import *
from pyspark.sql.types import *

exprs=[coalesce(col('df.'f'{f}'),col('df1.'f'{f}')).alias(f) for f in df1.columns]


df2=df1.alias("df1").join(df.alias("df"),(col("df1.date") == col("df.date"))& (col("df1.id") == col("df.id")),'left').select(*exprs)

df2.orderBy("id","date").show()
#+---+----------+--------+
#| id|      date|quantity|
#+---+----------+--------+
#|  1|2016-01-01|       0|
#|  1|2016-01-02|       0|
#|  1|2016-01-03|      10|
#|  1|2016-01-04|      20|
#|  1|2016-01-05|       0|
#|  1|2016-01-06|      30|
#|  1|2016-01-07|      20|
#|  1|2016-01-08|       0|
#|  2|2016-01-01|       0|
#|  2|2016-01-02|      10|
#|  2|2016-01-03|      10|
#|  2|2016-01-04|      20|
#|  2|2016-01-05|       0|
#|  2|2016-01-06|      20|
#|  2|2016-01-07|      20|
#|  2|2016-01-08|       0|
#+---+----------+--------+

Upvotes: 5

Related Questions