MAC
MAC

Reputation: 172

Pyspark DF Pivot and Create Arrays columns

I have an input PySpark df:

+---------+-------+--------+----------+----------+
|timestamp|user_id|results |event_name|product_id|
+---------+-------+--------+----------+----------+
|1000     |user_1 |result 1|Click     |1         |
|1001     |user_1 |result 1|View      |1         |
|1002     |user_1 |result 2|Click     |3         |
|1003     |user_1 |result 2|View      |4         |
|1004     |user_1 |result 2|View      |5         |
+---------+-------+--------+----------+----------+

root
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- results: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- product_id: string (nullable = true)

I'd like to convert this to following making sure that I keep unique combinations of user_id and results, and aggregate product_ids based on given event_name like this:

+-------+--------+---------------+---------------+
|user_id|results |product_clicked|products_viewed|
+-------+--------+---------------+---------------+
|user_1 |result 1|[1]            |[1]            |
|user_1 |result 2|[4,5]          |[3]            |
+-------+--------+---------------+---------------+

root
 |-- user_id: string (nullable = true)
 |-- results: string (nullable = true)
 |-- product_clicked: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- products_viewed: array (nullable = true)
 |    |-- element: string (containsNull = true)

I have looked into pivot, it's close but I do not need the aggregation part of it, instead I need array creation on columns which are created based on event_name column. Cannot figure our how to do it.

NOTE: The order in product_clicked and product_viewed columns above is important and is based on timestamp column of input dataframe.

Upvotes: 1

Views: 471

Answers (1)

mck
mck

Reputation: 42422

You can use collect_list during the pivot aggregation:

import pyspark.sql.functions as F

df2 = (df.groupBy('user_id', 'results')
         .pivot('event_name')
         .agg(F.collect_list('product_id'))
         .selectExpr("user_id", "results", "Click as product_clicked", "View as product_viewed")
      )

df2.show()
+-------+-------+---------------+--------------+
|user_id|results|product_clicked|product_viewed|
+-------+-------+---------------+--------------+
| user_1|result2|            [3]|        [4, 5]|
| user_1|result1|            [1]|           [1]|
+-------+-------+---------------+--------------+

To ensure ordering, you can collect a list of structs containing the timestamp, sort the list, and transform the list to only keep the product_id:

df2 = (df.groupBy('user_id', 'results')
         .pivot('event_name')
         .agg(F.sort_array(F.collect_list(F.struct('timestamp', 'product_id'))))
         .selectExpr("user_id", "results", "transform(Click, x -> x.product_id) as product_clicked", "transform(View, x -> x.product_id) as product_viewed")
      )

df2.show()
+-------+-------+---------------+--------------+
|user_id|results|product_clicked|product_viewed|
+-------+-------+---------------+--------------+
| user_1|result2|            [3]|        [4, 5]|
| user_1|result1|            [1]|           [1]|
+-------+-------+---------------+--------------+

Upvotes: 4

Related Questions