Reputation: 149
I'm currently attempting to perform a date-dependent aggregation on a table's column using dates from another table. Table 1 contains user IDs and dates (plus other information to not be aggregated). Table 2 contains the values I wish to aggregate along with the same IDs and distinct dates.
The goal is to aggregate values from table 2 only if they precede the date on a row in table 1.
In the illustration below, the aggregation function desired is "mean", however if a general PySpark (or SQL) solution could be provided that allows this aggregation function to be either a built in (F.mean, F.sum) or custom user-defined function, that would be ideal.
Table 1 - Date table (note: user IDs can be repeated in both tables)
+---+---------- +----------
|USER| DATE |USER_STATE|
+---+---------- +----------
| 3 | 7/1/2019 | Current |
| 1 | 6/9/2019 | Expired |
| 1 | 1/1/2019 | Current |
+----+----------+-----------
Table 2 - Aggregation table
+---+---------- +----------
|USER|CHARGEDATE|AMOUNTPAID|
+---+---------- +----------
| 1 | 7/1/2018 | 10.00 |
| 1 | 5/1/2019 | 40.00 |
| 1 | 2/2/2019 | 10.00 |
| 3 | 1/2/2019 | 15.00 |
+----+----------+-----------
Desired Output - aggregation (mean) is calculated per user and is dependent on CHARGEDATE being prior to DATE in table 1
+---+---------- +----------+---------------
|USER| DATE |USER_STATE|MEAN_AMOUNTPAID|
+---+---------- +----------+---------------
| 3 | 7/1/2019 | Current | 15.00 |
| 1 | 6/9/2019 | Expired | 20.00 |
| 1 | 1/1/2019 | Current | 10.00 |
+----+----------+----------+---------------
Row 2 - includes all user 1 Table 2 values because all ChargedDate< date
Row 3 - includes only includes user 1's row 1 Table 2 value because it's the only chargeddate less than date
I'm aware this could inefficiently be accomplished by running a loop on each row in Table 1, getting the DATE for that row, and using it to query the second table. I'm looking for a solution without loops if possible. Thanks in advance!
Upvotes: 2
Views: 836
Reputation: 8410
PySpark
way to do this would involve converting your DATE
and CHARGEDATE
columns to DateType
to able to filter
on DATE>CHARGEDATE
. So I assumed your date was in format "M/d/yyyy"
, if it was other way around just replace that to "d/M/yyyy"
#data.show()
#+----+--------+----------+
#|USER| DATE|USER_STATE|
#+----+--------+----------+
#| 3|7/1/2019| Current|
#| 1|6/9/2019| Expired|
#| 1|1/1/2019| Current|
#+----+--------+----------+
#aggregation.show()
#+----+----------+----------+
#|USER|CHARGEDATE|AMOUNTPAID|
#+----+----------+----------+
#| 1| 7/1/2018| 10.0|
#| 1| 5/1/2019| 40.0|
#| 1| 2/2/2019| 10.0|
#| 3| 1/2/2019| 15.0|
#+----+----------+----------+
from pyspark.sql import functions as F
data.join(aggregation,['USER'])\
.withColumn("DATE",F.to_date("DATE","M/d/yyyy"))\
.withColumn("CHARGEDATE", F.to_date("CHARGEDATE", "M/d/yyyy"))\
.filter("DATE>CHARGEDATE")\
.groupBy("USER","DATE","USER_STATE").agg(F.mean("AMOUNTPAID").alias("mean_amount_paid"))\
.show()
+----+----------+----------+----------------+
|USER| DATE|USER_STATE|mean_amount_paid|
+----+----------+----------+----------------+
| 1|2019-06-09| Expired| 20.0|
| 1|2019-01-01| Current| 10.0|
| 3|2019-07-01| Current| 15.0|
+----+----------+----------+----------------+
Upvotes: 4
Reputation: 7503
Try the following and also check it out here sqlfiddle
select
d.users,
date,
user_state,
avg(amount) as mean_amount_paid
from data d
join aggregation a
on d.users = a.users
where d.date > a.ch_date
group by
d.users,
date,
user_state
Output:
|users | date | user_state | mean
+-------------------------------+
1 2019-01-01 Current 10
1 2019-06-09 Expired 20
3 2019-07-01 Current 15
+-------------------------------+
Upvotes: 1