Dan
Dan

Reputation: 760

How to avoid nested map calls in Spark?

I have a list of transactions where users take a board from one station to another. This is an array of arrays, called trans:

Board:  User:    Station:     Action:      Time:
[
 ['1', 'Ana', 'Tribeca', 'check_out', '1:00pm'],
 ['1', 'Ana', 'Soho'   , 'park'     , '2:00pm'],
 ['1', 'Bob', 'Soho'   , 'check_out', '3:00pm'],
 ['1', 'Bob', 'Chelsea', 'park'     , '4:00pm'],
 ['2'...]
]

(E.g. Board '1' was checked out by 'Ana' in 'Tribeca' at '1:00pm', parked in Soho at '2:00pm', and then checked out by 'Bob').

With this line of code, I group each board to its transactions:

board_groups = trans
    .map(lambda oneTrans: 
        (oneTrans[0], 
        [oneTrans[1], oneTrans[2], oneTrans[3], oneTrans[4]]))
    .groupByKey()
    .mapValues(list)

giving:

('1', [
       ['Ana' , 'Tribeca' , 'check_out', '1:00pm'],
       ['Ana' , 'Soho'    , 'park'     , '2:00pm'],
       ['Bob' , 'Soho'    , 'check_out', '3:00pm'],
       ['Bob' , 'Chelsea' , 'park'     , '4:00pm' ]
      ]),
('2', ...)

How could I achieve this:

('1', ('Ana',
       [
         ['Tribeca', 'check_out', '1:00pm'],
         ['Soho'   , 'park'     , '2:00pm']
       ]
      ),

      ('Bob',
       [
         ['Soho'   , 'check_out', '3:00pm'],
         ['Chelsea', 'park'     , '4:00pm' ]
       ]
      )
),
('2'...)

using Spark's methods?

Since the each value of board_groups is an array of arrays, I tried to use mapValue on it, so that for each array within each value, I could map to make each user a key, and then group users together. However I couldn't do this because this involved nested parallelization calls.

Upvotes: 0

Views: 117

Answers (1)

Fang Zhang
Fang Zhang

Reputation: 1697

Suppose your data is

df.show

+-----+----+-------+---------+------+
|board|user|station|   action|  time|
+-----+----+-------+---------+------+
|    1| Ana|Tribeca|check_out|1:00pm|
|    1| Ana|   Soho|     park|2:00pm|
|    1| Bob|   Soho|check_out|3:00pm|
|    1| Bob|Chelsea|     park|4:00pm|
|    2| Tom|Chelsea|     park|4:00pm|
+-----+----+-------+---------+------+

You can group your data using:

import org.apache.spark.sql.functions.collect_list

df.select(struct('board, 'user).as("group"), struct('station, 'action, 'time).as('item))
    .groupBy('group).agg(collect_list('item).as("items"))
    .select(col("group.board"), struct(col("group.user"), col("items")).as("user_action"))
    .groupBy('board).agg(collect_list('user_action))

The result will look like this:

+-----+--------------------------------------------------------------------------------------------------------------------------+
|board|collect_list(user_action)                                                                                                 |
+-----+--------------------------------------------------------------------------------------------------------------------------+
|1    |[[Ana, [[Tribeca, check_out, 1:00pm], [Soho, park, 2:00pm]]], [Bob, [[Soho, check_out, 3:00pm], [Chelsea, park, 4:00pm]]]]|
|2    |[[Tom, [[Chelsea, park, 4:00pm]]]]                                                                                        |
+-----+--------------------------------------------------------------------------------------------------------------------------+

The code here is in Scala, and will be almost identical in Python.

Upvotes: 1

Related Questions