Aesop
Aesop

Reputation: 71

Window & Aggregate functions in Pyspark SQL/SQL

After the answer by @Vaebhav realized the question was not set up correctly. Hence editing it with his code snippet.

I have the following table

from pyspark.sql.types import IntegerType,TimestampType,DoubleType

input_str = """
4219,2018-01-01 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55,
1139,2018-01-21 11:05:00,1.0,400.0,
2170,2018-01-21 09:10:00,2.0,100.0,
4218,2018-02-06 09:36:00,5.0,307.55,
4218,2018-02-06 09:36:00,5.0,307.55
""".split(",")

input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id,timestamp,quantity,price".split(',')))
n = len(input_values)
n_cols = 4
input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]
sparkDF = sqlContext.createDataFrame(input_list,cols)
sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))\
                 .withColumn('timestamp',F.col('timestamp').cast(TimestampType()))\
                 .withColumn('quantity',F.col('quantity').cast(IntegerType()))\
                 .withColumn('price',F.col('price').cast(DoubleType()))

I want to calculate the aggergate as follows :

trxn_date unique_cust_visits next_7_day_visits next_30_day_visits
2018-01-01 1 7 9
2018-01-02 2 6 8
2018-01-03 2 4 6
2018-01-06 2 2 4
2018-01-21 2 2 3
2018-02-06 1 1 1

where the

I want to write the code as a single SQL query.

Upvotes: 1

Views: 101

Answers (2)

Aesop
Aesop

Reputation: 71

Building upon @Vaebhav's answer the required query in this case is

sqlContext.sql("""
        SELECT 
            TO_DATE(timestamp) as trxn_date
            ,COUNT(DISTINCT customer_id) as unique_cust_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY CAST(TO_DATE(timestamp) AS TIMESTAMP) DESC
                        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
            ) as next_7_day_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY CAST(TO_DATE(timestamp) AS TIMESTAMP) DESC
                        RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW
            ) as next_30_day_visits
        FROM transactions
        GROUP BY 1
        ORDER by trxn_date
""").show()
trxn_date unique_cust_visits next_7_day_visits next_30_day_visits
2018-01-01 1 7 9
2018-01-02 2 6 8
2018-01-03 2 4 6
2018-01-06 2 2 4
2018-01-21 2 2 3
2018-02-06 1 1 1

Upvotes: 0

Vaebhav
Vaebhav

Reputation: 5062

You can achieve this by using ROW rather than a RANGE Frame Type , a good explanation can be found here

ROW - based on physical offsets from the position of the current input row

RANGE - based on logical offsets from the position of the current input row

Also in your implementation ,a PARTITION BY clause would be redundant, as it wont create the required Frames for a look-ahead.

Data Preparation

input_str = """
4219,2018-01-02 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55
""".split(",")

input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))

cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id  timestamp   quantity    price".split('\t')))
        
n = len(input_values)
n_cols = 4

input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]

sparkDF = sql.createDataFrame(input_list,cols)

sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))\
                 .withColumn('timestamp',F.col('timestamp').cast(TimestampType()))\
                 .withColumn('quantity',F.col('quantity').cast(IntegerType()))\
                 .withColumn('price',F.col('price').cast(DoubleType()))

sparkDF.show()

+-----------+-------------------+--------+------+
|customer_id|          timestamp|quantity| price|
+-----------+-------------------+--------+------+
|       4219|2018-01-02 08:10:00|       3| 50.78|
|       4216|2018-01-02 08:01:00|       5|100.84|
|       4217|2018-01-02 20:00:00|       4|800.49|
|       4139|2018-01-03 11:05:00|       1| 400.0|
|       4170|2018-01-03 09:10:00|       2| 100.0|
|       4029|2018-01-06 09:06:00|       6|300.55|
|       4029|2018-01-06 09:16:00|       2|310.55|
|       4217|2018-01-06 09:36:00|       5|307.55|
+-----------+-------------------+--------+------+

Window Aggregates

sparkDF.createOrReplaceTempView("transactions")

sql.sql("""
        SELECT 
            TO_DATE(timestamp) as trxn_date
            ,COUNT(DISTINCT customer_id) as unique_cust_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY 'timestamp'
                        ROWS BETWEEN CURRENT ROW AND 7 FOLLOWING
            ) as next_7_day_visits
        FROM transactions
        GROUP BY 1
""").show()

+----------+------------------+-----------------+
| trxn_date|unique_cust_visits|next_7_day_visits|
+----------+------------------+-----------------+
|2018-01-02|                 3|                7|
|2018-01-03|                 2|                4|
|2018-01-06|                 2|                2|
+----------+------------------+-----------------+

Upvotes: 1

Related Questions