Reputation: 2743
So I have a pyspark dataframe that I want to add another column to using the value from the Section_1 column and find its corresponding value in a python dictionary. So basically use the value from the Section_1 cell as the key and then fill in the value from the python dictionary in the new column like below.
Original dataframe
DataId | ObjId | Name | Object | Section_1 |
---|---|---|---|---|
My data | Data name | Object name | rd.111 | rd.123 |
Python Dictionary
object_map= {'rd.123' : 'rd.567'}
Where section 1 has a value of rd.123 and I will search in the dictionary for the key 'rd.123' and want to return that value of rd.567 and place that in the new column
Desired DataFrame
DataId | ObjId | Name | Object | Section_1 | Section_2 |
---|---|---|---|---|---|
My data | Data name | Object name | rd.111 | rd.123 | rd.567 |
Right now I got this error with my current code and I dont really know what I did wrong as I am not to familiar with pyspark
There is an incorrect call to a Column object in your code. Please review your code.
Here is my code that I am currently using where object_map is the python dictionary.
test_df = output.withColumn('Section_2', object_map.get(output.Section_1.collect()))
Upvotes: 3
Views: 2191
Reputation: 42392
You can try this (adapted from this answer with added null handling):
from itertools import chain
from pyspark.sql.functions import create_map, lit, when
object_map = {'rd.123': 'rd.567'}
mapping_expr = create_map([lit(x) for x in chain(*object_map.items())])
df1 = df.filter(df['Section_1'].isNull()).withColumn('Section_2', F.lit(None))
df2 = df.filter(df['Section_1'].isNotNull()).withColumn(
'Section_2',
when(
df['Section_1'].isNotNull(),
mapping_expr[df['Section_1']]
)
)
result = df1.unionAll(df2)
Upvotes: 3