crash
crash

Reputation: 4512

How to aggregate values across different columns in PySpark (or eventually SQL)?

Let's consider the following input data

| incremental_id | session_start_id | session_end_id | items_bought |
|----------------|------------------|----------------|--------------|
| 1              | a                | b              | 1            |
| 2              | z                | t              | 7            |
| 3              | b                | c              | 0            |
| 4              | c                | d              | 3            |

Where:

I want to be able to aggregate the above data so that I can get:

How could this be done in PySpark (or eventually in pure SQL)? I would like to avoid using UDFs in PySpark, but it's ok if that's the only way.

Thanks for your help!

Edit: I have update the example dataframe, the incremental_id alone cannot be used to order the rows as consecutive sessions

Upvotes: 3

Views: 591

Answers (3)

Hasan Manzak
Hasan Manzak

Reputation: 663

If you are able to access temp table creation and affected row count metadata then you can port this:

insert into #CTESubs
select
    session_start_id,
    session_end_id,
    items_bought
from #user_session
WHERE
    session_start_id not in (select session_end_id from #user_session)

while(@@ROWCOUNT <> 0)
begin
    insert into #CTESubs
    select distinct
        p.session_start_id,
        c.session_end_id,
        c.items_bought
    from #user_session c
        inner join #CTESubs p on c.session_start_id = p.session_end_id
    WHERE
        p.session_start_id not in (select session_end_id from #user_session) 
        and c.session_end_id not in (select session_end_id from #CTESubs)
end

select
    session_start_id,
    sum(items_bought) items_bought
from #CTESubs
group by 
    session_start_id;

Upvotes: 0

ags29
ags29

Reputation: 2696

Here is a PySpark version

from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *

# create a window over the full data so we can lag the session end id
win = Window().partitionBy().orderBy("incremental_id")

# This is logic to indicate a user change
df = df.withColumn('user_boundary', F.lag(F.col("session_end_id"), 1).over(win) != F.col("session_start_id"))
df = df.withColumn('user_boundary', F.when(F.col("user_boundary").isNull(), F.lit(False)).otherwise(F.col("user_boundary")))

# Now create an artificial user id
df = df.withColumn('user_id', F.sum(F.col("user_boundary").cast(IntegerType())).over(win))

# Aggregate
df.groupby('user_id').agg(F.sum(F.col("items_bought")).alias("total_bought")).show()

+-------+------------+
|user_id|total_bought|
+-------+------------+
|      0|           4|
|      1|           7|
+-------+------------+

Upvotes: 0

Prasanna
Prasanna

Reputation: 2488

Common Table Expressions are part of SQL:1999.

Using CTE, we can use the below query

WITH cte(session_start_id, session_end_id, items_bought) AS (
  select session_start_id, session_end_id, items_bought from user_session where session_start_id not in (
    select session_end_id from user_session)
UNION ALL
select a.session_start_id, b.session_end_id, b.items_bought from cte a 
  inner join user_session b on a.session_end_id = b.session_start_id)
  select session_start_id, sum(items_bought) from cte group by (session_start_id)

Explanation:

  • In the anchor query, select all the records that does not have a parent. (i.e., no other records ends with current session_start_id)
  • Recursively, join the cte's session_end_id with session_start_id from the table.
  • Group the records and return the result.

SQL Fiddle link: http://sqlfiddle.com/#!4/ac98a/4/0
(Note: Used Oracle in fiddle. But any DB engine that supports CTE should work).

Upvotes: 1

Related Questions