OrbisUnum
OrbisUnum

Reputation: 67

Mapping column from arrays in Pyspark

I'm new to working with Pyspark df when there are arrays stored in columns and looking for some help in trying to map a column based on 2 PySpark Dataframes with one being a reference df.

Reference Dataframe (Number of Subgroups varies for each Group):

| Group | Subgroup |       Size        |      Type        |
| ----  | -------- | ------------------| ---------------  |
|A      | A1       |['Small','Medium'] | ['A','B']        |
|A      | A2       |['Small','Medium'] | ['C','D']        |
|B      | B1       |['Small']          | ['A','B','C','D']|

Source Dataframe:

| ID    | Size     |  Type    |     
| ----  | -------- | ---------| 
|ID_001 | 'Small'  |'A'       | 
|ID_002 | 'Medium' |'B'       | 
|ID_003 | 'Small'  |'D'       | 

In the result, each ID belongs to every Group, but is exclusive for its' subgroups based on the reference df with the result looking something like this:

| ID    | Size     |  Type    |  A_Subgroup  |   B_Subgroup  |
| ----  | -------- | ---------|  ----------  | ------------- |
|ID_001 | 'Small'  |'A'       | 'A1'         |  'B1'         |
|ID_002 | 'Medium' |'B'       | 'A1'         |  Null         |
|ID_003 | 'Small'  |'D'       | 'A2'         |  'B1'         |

Upvotes: 0

Views: 682

Answers (1)

mck
mck

Reputation: 42332

You can do a join using array_contains conditions, and pivot the result:

import pyspark.sql.functions as F

result = source.alias('source').join(
    ref.alias('ref'),
    F.expr("""
        array_contains(ref.Size, source.Size) and 
        array_contains(ref.Type, source.Type)
    """),
    'left'
).groupBy(
    'ID', source['Size'], source['Type']
).pivot('Group').agg(F.first('Subgroup'))

result.show()
+------+------+----+---+----+
|    ID|  Size|Type|  A|   B|
+------+------+----+---+----+
|ID_003| Small|   D| A2|  B1|
|ID_002|Medium|   B| A1|null|
|ID_001| Small|   A| A1|  B1|
+------+------+----+---+----+

Upvotes: 1

Related Questions