Reputation: 135
I need to merge two dataframes on an identifier and condition where a date in one dataframe is between two dates in the other dataframe and groupby (calculate the sum) of the other column
Dataframe A has a date ("date"), number("number") and an ID ("id"):
| id | date | number |
| 101 | 2018-12-01 | 250 |
| 101 | 2018-12-02 | 150 |
| 102 | 2018-11-25 | 1000 |
| 102 | 2018-10-26 | 2000 |
| 102 | 2018-09-25 | 5000 |
| 103 | 2018-10-26 | 200 |
| 103 | 2018-10-27 | 2000 |
Dataframe B has Id("id"), fromdate("fromdate") and a todate("todate"):
| id | fromdate | todate |
| 101 | 2018-10-01 | 2018-11-01 |
| 101 | 2018-11-02 | 2018-12-30 |
| 102 | 2018-09-01 | 2018-09-30 |
| 102 | 2018-10-01 | 2018-12-31 |
| 103 | 2018-10-01 | 2018-10-30 |
| 104 | 2018-10-01 | 2018-10-30 |
Now I need to merge these two dataframes on id and date and then sum all the numbers accordingly. For example: Consider fourth row in dataframe B, For id 102, and in between those dates, we have two corresponding rows(Row #3,4) from dataframe Am Merge them by calculating the sum.
So the resulting row would be
| id | fromdate | todate | sum |
| 102 | 2018-10-01 | 2018-12-31 | 3000 |
End result should be: | id | fromdate | todate | sum |
| 101 | 2018-10-01 | 2018-11-01 | 0 |
| 101 | 2018-11-02 | 2018-12-30 | 400 |
| 102 | 2018-09-01 | 2018-09-30 | 5000 |
| 102 | 2018-10-01 | 2018-12-31 | 3000 |
| 103 | 2018-10-01 | 2018-10-30 | 2200 |
| 104 | 2018-10-01 | 2018-10-30 | 0 |
Upvotes: 1
Views: 1200
Reputation: 4099
Here is detailed approach you can follow -
from pyspark.sql.types import *
################
##Define Schema
################
schema1 = StructType([StructField('id', IntegerType(), True),
StructField('date', StringType(), True),
StructField('number', IntegerType(), True)
]
)
schema2 = StructType([StructField('id', IntegerType(), True),
StructField('fromdate', StringType(), True),
StructField('todate', StringType(), True)
]
)
################
##Prepare Data
################
data1 = [
(101,'2018-12-01',250 ),
(101,'2018-12-02',150 ),
(102,'2018-11-25',1000),
(102,'2018-10-26',2000),
(102,'2018-09-25',5000),
(103,'2018-10-26',200 ),
(103,'2018-10-27',2000)
]
data2 = [
(101,'2018-10-01','2018-11-01'),
(101,'2018-11-02','2018-12-30'),
(102,'2018-09-01','2018-09-30'),
(102,'2018-10-01','2018-12-31'),
(103,'2018-10-01','2018-10-30'),
(104,'2018-10-01','2018-10-30')
]
################
##Create dataframe and type cast to date
################
df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)
df1 = df1.select(df1.id,df1.date.cast("date"),df1.number)
df2 = df2.select(df2.id,df2.fromdate.cast("date"),df2.todate.cast("date"))
Define join condition and join dataframes
################
##Define Joining Condition
################
cond = [df1.id == df2.id, df1.date.between(df2.fromdate,df2.todate)]
################
##Join dataframes using joining condition "cond" and aggregation
################
from pyspark.sql.functions import coalesce
df2.\
join(df1, cond,'left').\
select(df2.id,df1.number,df2.fromdate,df2.todate).\
groupBy('id','fromdate','todate').\
sum('number').fillna(0).\
show()
Upvotes: 3