E. Camus
E. Camus

Reputation: 149

Aggregating column based on datetime column of another table PySpark/SQL

I'm currently attempting to perform a date-dependent aggregation on a table's column using dates from another table. Table 1 contains user IDs and dates (plus other information to not be aggregated). Table 2 contains the values I wish to aggregate along with the same IDs and distinct dates.

The goal is to aggregate values from table 2 only if they precede the date on a row in table 1.

In the illustration below, the aggregation function desired is "mean", however if a general PySpark (or SQL) solution could be provided that allows this aggregation function to be either a built in (F.mean, F.sum) or custom user-defined function, that would be ideal.

Table 1 - Date table (note: user IDs can be repeated in both tables)

+---+---------- +----------          
|USER|   DATE   |USER_STATE|   
+---+---------- +---------- 
|  3 | 7/1/2019 |  Current |
|  1 | 6/9/2019 |  Expired |
|  1 | 1/1/2019 |  Current |
+----+----------+----------- 

Table 2 - Aggregation table

+---+---------- +----------          
|USER|CHARGEDATE|AMOUNTPAID|   
+---+---------- +---------- 
|  1 | 7/1/2018 |  10.00   |
|  1 | 5/1/2019 |  40.00   |
|  1 | 2/2/2019 |  10.00   |
|  3 | 1/2/2019 |  15.00   |
+----+----------+----------- 

Desired Output - aggregation (mean) is calculated per user and is dependent on CHARGEDATE being prior to DATE in table 1

+---+---------- +----------+---------------          
|USER|   DATE   |USER_STATE|MEAN_AMOUNTPAID|   
+---+---------- +----------+--------------- 
|  3 | 7/1/2019 |  Current |    15.00      |
|  1 | 6/9/2019 |  Expired |    20.00      | 
|  1 | 1/1/2019 |  Current |    10.00      |
+----+----------+----------+--------------- 
Row 2 - includes all user 1 Table 2 values because all ChargedDate< date 
Row 3 - includes only includes user 1's row 1 Table 2 value because it's the only chargeddate less than date

I'm aware this could inefficiently be accomplished by running a loop on each row in Table 1, getting the DATE for that row, and using it to query the second table. I'm looking for a solution without loops if possible. Thanks in advance!

Upvotes: 2

Views: 836

Answers (2)

murtihash
murtihash

Reputation: 8410

PySpark way to do this would involve converting your DATE and CHARGEDATE columns to DateType to able to filter on DATE>CHARGEDATE. So I assumed your date was in format "M/d/yyyy", if it was other way around just replace that to "d/M/yyyy"

#data.show()
#+----+--------+----------+
#|USER|    DATE|USER_STATE|
#+----+--------+----------+
#|   3|7/1/2019|   Current|
#|   1|6/9/2019|   Expired|
#|   1|1/1/2019|   Current|
#+----+--------+----------+

#aggregation.show()
#+----+----------+----------+
#|USER|CHARGEDATE|AMOUNTPAID|
#+----+----------+----------+
#|   1|  7/1/2018|      10.0|
#|   1|  5/1/2019|      40.0|
#|   1|  2/2/2019|      10.0|
#|   3|  1/2/2019|      15.0|
#+----+----------+----------+

from pyspark.sql import functions as F
data.join(aggregation,['USER'])\
      .withColumn("DATE",F.to_date("DATE","M/d/yyyy"))\
      .withColumn("CHARGEDATE", F.to_date("CHARGEDATE", "M/d/yyyy"))\
      .filter("DATE>CHARGEDATE")\
      .groupBy("USER","DATE","USER_STATE").agg(F.mean("AMOUNTPAID").alias("mean_amount_paid"))\
      .show()

+----+----------+----------+----------------+
|USER|      DATE|USER_STATE|mean_amount_paid|
+----+----------+----------+----------------+
|   1|2019-06-09|   Expired|            20.0|
|   1|2019-01-01|   Current|            10.0|
|   3|2019-07-01|   Current|            15.0|
+----+----------+----------+----------------+

Upvotes: 4

zealous
zealous

Reputation: 7503

Try the following and also check it out here sqlfiddle

select
  d.users,
  date,
  user_state,
  avg(amount) as mean_amount_paid
from data d
join aggregation a
on d.users = a.users
where d.date > a.ch_date
group by
  d.users,
  date,
  user_state

Output:

|users | date | user_state | mean
+-------------------------------+
  1  2019-01-01  Current      10
  1  2019-06-09  Expired      20
  3  2019-07-01  Current      15
+-------------------------------+

Upvotes: 1

Related Questions