Reputation: 760
I have a list of transactions where users take a board from one station to another. This is an array of arrays, called trans
:
Board: User: Station: Action: Time:
[
['1', 'Ana', 'Tribeca', 'check_out', '1:00pm'],
['1', 'Ana', 'Soho' , 'park' , '2:00pm'],
['1', 'Bob', 'Soho' , 'check_out', '3:00pm'],
['1', 'Bob', 'Chelsea', 'park' , '4:00pm'],
['2'...]
]
(E.g. Board '1' was checked out by 'Ana' in 'Tribeca' at '1:00pm', parked in Soho at '2:00pm', and then checked out by 'Bob').
With this line of code, I group each board to its transactions:
board_groups = trans
.map(lambda oneTrans:
(oneTrans[0],
[oneTrans[1], oneTrans[2], oneTrans[3], oneTrans[4]]))
.groupByKey()
.mapValues(list)
giving:
('1', [
['Ana' , 'Tribeca' , 'check_out', '1:00pm'],
['Ana' , 'Soho' , 'park' , '2:00pm'],
['Bob' , 'Soho' , 'check_out', '3:00pm'],
['Bob' , 'Chelsea' , 'park' , '4:00pm' ]
]),
('2', ...)
How could I achieve this:
('1', ('Ana',
[
['Tribeca', 'check_out', '1:00pm'],
['Soho' , 'park' , '2:00pm']
]
),
('Bob',
[
['Soho' , 'check_out', '3:00pm'],
['Chelsea', 'park' , '4:00pm' ]
]
)
),
('2'...)
using Spark's methods?
Since the each value of board_groups
is an array of arrays, I tried to use mapValue
on it, so that for each array within each value, I could map
to make each user a key, and then group users together. However I couldn't do this because this involved nested parallelization calls.
Upvotes: 0
Views: 117
Reputation: 1697
Suppose your data is
df.show
+-----+----+-------+---------+------+
|board|user|station| action| time|
+-----+----+-------+---------+------+
| 1| Ana|Tribeca|check_out|1:00pm|
| 1| Ana| Soho| park|2:00pm|
| 1| Bob| Soho|check_out|3:00pm|
| 1| Bob|Chelsea| park|4:00pm|
| 2| Tom|Chelsea| park|4:00pm|
+-----+----+-------+---------+------+
You can group your data using:
import org.apache.spark.sql.functions.collect_list
df.select(struct('board, 'user).as("group"), struct('station, 'action, 'time).as('item))
.groupBy('group).agg(collect_list('item).as("items"))
.select(col("group.board"), struct(col("group.user"), col("items")).as("user_action"))
.groupBy('board).agg(collect_list('user_action))
The result will look like this:
+-----+--------------------------------------------------------------------------------------------------------------------------+
|board|collect_list(user_action) |
+-----+--------------------------------------------------------------------------------------------------------------------------+
|1 |[[Ana, [[Tribeca, check_out, 1:00pm], [Soho, park, 2:00pm]]], [Bob, [[Soho, check_out, 3:00pm], [Chelsea, park, 4:00pm]]]]|
|2 |[[Tom, [[Chelsea, park, 4:00pm]]]] |
+-----+--------------------------------------------------------------------------------------------------------------------------+
The code here is in Scala, and will be almost identical in Python.
Upvotes: 1