edyvedy13
edyvedy13

Reputation: 2296

Calculating the rolling sums in pyspark

I have a dataframe that contains information on the daily sales and daily clicks. Before I want to run my analysis, I want to aggregate the data. To make myself clearer, I will try to explain it on an example dataframe

item_id date          Price  Sale   Click   Discount_code     
2       01.03.2019    10       1     10      NULL
2       01.03.2019    8        1     10      Yes
2       02.03.2019    10       0     4       NULL
2       03.03.2019    10       0     6       NULL
2       04.03.2019    6        0     15      NULL 
2       05.03.2019    6        0     14      NULL
2       06.03.2019    5        0     7       NULL 
2       07.03.2019    5        1     11      NULL
2       07.03.2019    5        1     11      NULL
2       08.03.2019    5        0     9       NULL

If there are two sales for the given day, I have two observations for that particular day. I want to convert my dataframe to the following one by collapsing observations by item_id and price:

item_id    Price  CSale  Discount_code Cclicks   firstdate   lastdate  
2           10       1      No             20    01.03.2019 03.03.2019
2           8        1      Yes            10    01.03.2019 01.03.2019
2           6        0      NULL           29    04.03.2019 05.03.2019
2           5        2      NULL           38    06.03.2019 08.03.2019 

Where CSale correponds to the cumulative sales for the given price and given item_id, Cclicks corresponds to the cumulative clicks for the given price and given item_id, firstdate is the first date on which the given item was available for the given price and lastdate is the last date on which the given item was available for the given price.

Upvotes: 1

Views: 138

Answers (1)

cph_sto
cph_sto

Reputation: 7585

According to the problem, OP wants to aggregate the DataFrame on the basis of item_id and Price.

# Creating the DataFrames
from pyspark.sql.functions import col, to_date, sum, min, max, first
df = sqlContext.createDataFrame([(2,'01.03.2019',10,1,10,None),(2,'01.03.2019',8,1,10,'Yes'),
                                 (2,'02.03.2019',10,0,4,None),(2,'03.03.2019',10,0,6,None),
                                 (2,'04.03.2019',6,0,15,None),(2,'05.03.2019',6,0,14,None),
                                 (2,'06.03.2019',5,0,7,None),(2,'07.03.2019',5,1,11,None),
                                 (2,'07.03.2019',5,1,11,None),(2,'08.03.2019',5,0,9,None)],
                                ('item_id','date','Price','Sale','Click','Discount_code'))
# Converting string column date to proper date
df = df.withColumn('date',to_date(col('date'),'dd.MM.yyyy'))
df.show()
+-------+----------+-----+----+-----+-------------+
|item_id|      date|Price|Sale|Click|Discount_code|
+-------+----------+-----+----+-----+-------------+
|      2|2019-03-01|   10|   1|   10|         null|
|      2|2019-03-01|    8|   1|   10|          Yes|
|      2|2019-03-02|   10|   0|    4|         null|
|      2|2019-03-03|   10|   0|    6|         null|
|      2|2019-03-04|    6|   0|   15|         null|
|      2|2019-03-05|    6|   0|   14|         null|
|      2|2019-03-06|    5|   0|    7|         null|
|      2|2019-03-07|    5|   1|   11|         null|
|      2|2019-03-07|    5|   1|   11|         null|
|      2|2019-03-08|    5|   0|    9|         null|
+-------+----------+-----+----+-----+-------------+

As can be seen in the printSchema below that the dataframe's date column is in date format.

df.printSchema()
root
 |-- item_id: long (nullable = true)
 |-- date: date (nullable = true)
 |-- Price: long (nullable = true)
 |-- Sale: long (nullable = true)
 |-- Click: long (nullable = true)
 |-- Discount_code: string (nullable = true)

Finally aggregating agg() the columns below. Just a caveat - Since Discount_code is a string column and we need to aggregate it as well, we will take the first non-Null value while grouping.

df = df.groupBy('item_id','Price').agg(sum('Sale').alias('CSale'),
                                       first('Discount_code',ignorenulls = True).alias('Discount_code'),
                                       sum('Click').alias('Cclicks'),
                                       min('date').alias('firstdate'),
                                       max('date').alias('lastdate'))
df.show()
+-------+-----+-----+-------------+-------+----------+----------+
|item_id|Price|CSale|Discount_code|Cclicks| firstdate|  lastdate|
+-------+-----+-----+-------------+-------+----------+----------+
|      2|    6|    0|         null|     29|2019-03-04|2019-03-05|
|      2|    5|    2|         null|     38|2019-03-06|2019-03-08|
|      2|    8|    1|          Yes|     10|2019-03-01|2019-03-01|
|      2|   10|    1|         null|     20|2019-03-01|2019-03-03|
+-------+-----+-----+-------------+-------+----------+----------+

Upvotes: 1

Related Questions