Reputation: 21
I would like to take my dictionary which contains keywords and check a column in a pyspark df to see if that keyword exists and if so then return the value from the dictionary in a new column.
The problem looks like this;
myDict = {
'price': 'Pricing Issue',
'support': 'Support Issue',
'android': 'Left for Competitor'
}
df = sc.parallelize([('1','Needed better Support'),('2','Better value from android'),('3','Price was to expensive')]).toDF(['id','reason'])
+-----+-------------------------+
| id |reason |
+-----+-------------------------+
|1 |Needed better support |
|2 |Better value from android|
|3 | Price was to expensive |
|4 | Support problems |
+-----+-------------------------+
The end result that I am looking for is this:
+-----+-------------------------+---------------------+
| id |reason |new_reason |
+-----+-------------------------+---------------------+
|1 |Needed better support | Support Issue |
|2 |Better value from android| Left for Competitor |
|3 |Price was to expensive | Pricing Issue |
|4 |Support issue | Support Issue |
+-----+-------------------------+---------------------+
What's the best way to build an efficient function to do this in pyspark?
Upvotes: 0
Views: 1765
Reputation: 32700
You can use when
expressions to check whether the column reason
matches the dict keys. You can dynamically generate the when
expressions using python functools.reduce
function by passing the list myDict.keys()
:
from functools import reduce
from pyspark.sql import functions as F
df2 = df.withColumn(
"new_reason",
reduce(
lambda c, k: c.when(F.lower(F.col("reason")).rlike(rf"\b{k.lower()}\b"), myDict[k]),
myDict.keys(),
F
)
)
df2.show(truncate=False)
#+---+-------------------------+-------------------+
#|id |reason |new_reason |
#+---+-------------------------+-------------------+
#|1 |Needed better Support |Support Issue |
#|2 |Better value from android|Left for Competitor|
#|3 |Price was to expensive |Pricing Issue |
#|4 |Support problems |Support Issue |
#+---+-------------------------+-------------------+
Upvotes: 2
Reputation: 42392
You can create a keywords dataframe, and join to the original dataframe using an rlike
condition. I added \\\\b
before and after the keywords so that only words between word boundaries will be matched, and there won't be partial word matches (e.g. "pineapple" matching "apple").
import pyspark.sql.functions as F
keywords = spark.createDataFrame([[k,v] for (k,v) in myDict.items()]).toDF('key', 'new_reason')
result = df.join(
keywords,
F.expr("lower(reason) rlike '\\\\b' || lower(key) || '\\\\b'"),
'left'
).drop('key')
result.show(truncate=False)
+---+-------------------------+-------------------+
|id |reason |new_reason |
+---+-------------------------+-------------------+
|1 |Needed better Support |Support Issue |
|2 |Better value from android|Left for Competitor|
|3 |Price was to expensive |Pricing Issue |
|4 |Support problems |Support Issue |
+---+-------------------------+-------------------+
Upvotes: 0