Reputation: 67
I'm new to working with Pyspark df when there are arrays stored in columns and looking for some help in trying to map a column based on 2 PySpark Dataframes with one being a reference df.
Reference Dataframe (Number of Subgroups varies for each Group):
| Group | Subgroup | Size | Type |
| ---- | -------- | ------------------| --------------- |
|A | A1 |['Small','Medium'] | ['A','B'] |
|A | A2 |['Small','Medium'] | ['C','D'] |
|B | B1 |['Small'] | ['A','B','C','D']|
Source Dataframe:
| ID | Size | Type |
| ---- | -------- | ---------|
|ID_001 | 'Small' |'A' |
|ID_002 | 'Medium' |'B' |
|ID_003 | 'Small' |'D' |
In the result, each ID belongs to every Group, but is exclusive for its' subgroups based on the reference df with the result looking something like this:
| ID | Size | Type | A_Subgroup | B_Subgroup |
| ---- | -------- | ---------| ---------- | ------------- |
|ID_001 | 'Small' |'A' | 'A1' | 'B1' |
|ID_002 | 'Medium' |'B' | 'A1' | Null |
|ID_003 | 'Small' |'D' | 'A2' | 'B1' |
Upvotes: 0
Views: 682
Reputation: 42332
You can do a join using array_contains
conditions, and pivot the result:
import pyspark.sql.functions as F
result = source.alias('source').join(
ref.alias('ref'),
F.expr("""
array_contains(ref.Size, source.Size) and
array_contains(ref.Type, source.Type)
"""),
'left'
).groupBy(
'ID', source['Size'], source['Type']
).pivot('Group').agg(F.first('Subgroup'))
result.show()
+------+------+----+---+----+
| ID| Size|Type| A| B|
+------+------+----+---+----+
|ID_003| Small| D| A2| B1|
|ID_002|Medium| B| A1|null|
|ID_001| Small| A| A1| B1|
+------+------+----+---+----+
Upvotes: 1