Reputation: 4512
Let's consider the following input data
| incremental_id | session_start_id | session_end_id | items_bought |
|----------------|------------------|----------------|--------------|
| 1 | a | b | 1 |
| 2 | z | t | 7 |
| 3 | b | c | 0 |
| 4 | c | d | 3 |
Where:
session_end_id = session_start_id
. The 4th row is instead related to a second userI want to be able to aggregate the above data so that I can get:
How could this be done in PySpark (or eventually in pure SQL)? I would like to avoid using UDFs in PySpark, but it's ok if that's the only way.
Thanks for your help!
Edit:
I have update the example dataframe, the incremental_id
alone cannot be used to order the rows as consecutive sessions
Upvotes: 3
Views: 591
Reputation: 663
If you are able to access temp table creation and affected row count metadata then you can port this:
insert into #CTESubs
select
session_start_id,
session_end_id,
items_bought
from #user_session
WHERE
session_start_id not in (select session_end_id from #user_session)
while(@@ROWCOUNT <> 0)
begin
insert into #CTESubs
select distinct
p.session_start_id,
c.session_end_id,
c.items_bought
from #user_session c
inner join #CTESubs p on c.session_start_id = p.session_end_id
WHERE
p.session_start_id not in (select session_end_id from #user_session)
and c.session_end_id not in (select session_end_id from #CTESubs)
end
select
session_start_id,
sum(items_bought) items_bought
from #CTESubs
group by
session_start_id;
Upvotes: 0
Reputation: 2696
Here is a PySpark version
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
# create a window over the full data so we can lag the session end id
win = Window().partitionBy().orderBy("incremental_id")
# This is logic to indicate a user change
df = df.withColumn('user_boundary', F.lag(F.col("session_end_id"), 1).over(win) != F.col("session_start_id"))
df = df.withColumn('user_boundary', F.when(F.col("user_boundary").isNull(), F.lit(False)).otherwise(F.col("user_boundary")))
# Now create an artificial user id
df = df.withColumn('user_id', F.sum(F.col("user_boundary").cast(IntegerType())).over(win))
# Aggregate
df.groupby('user_id').agg(F.sum(F.col("items_bought")).alias("total_bought")).show()
+-------+------------+
|user_id|total_bought|
+-------+------------+
| 0| 4|
| 1| 7|
+-------+------------+
Upvotes: 0
Reputation: 2488
Common Table Expressions are part of SQL:1999.
Using CTE, we can use the below query
WITH cte(session_start_id, session_end_id, items_bought) AS (
select session_start_id, session_end_id, items_bought from user_session where session_start_id not in (
select session_end_id from user_session)
UNION ALL
select a.session_start_id, b.session_end_id, b.items_bought from cte a
inner join user_session b on a.session_end_id = b.session_start_id)
select session_start_id, sum(items_bought) from cte group by (session_start_id)
Explanation:
SQL Fiddle link: http://sqlfiddle.com/#!4/ac98a/4/0
(Note: Used Oracle in fiddle. But any DB engine that supports CTE should work).
Upvotes: 1