Mike Solty
Mike Solty

Reputation: 21

PySpark: Check if value in col is like a key in a dict

I would like to take my dictionary which contains keywords and check a column in a pyspark df to see if that keyword exists and if so then return the value from the dictionary in a new column.

The problem looks like this;

myDict = {
'price': 'Pricing Issue',
'support': 'Support Issue',
'android': 'Left for Competitor'
}

df = sc.parallelize([('1','Needed better Support'),('2','Better value from android'),('3','Price was to expensive')]).toDF(['id','reason'])

+-----+-------------------------+
| id  |reason                   |
+-----+-------------------------+
|1    |Needed better support    |
|2    |Better value from android|
|3    | Price was to expensive  |
|4    | Support problems        |
+-----+-------------------------+

The end result that I am looking for is this:

+-----+-------------------------+---------------------+
| id  |reason                   |new_reason           |
+-----+-------------------------+---------------------+
|1    |Needed better support    | Support Issue       |
|2    |Better value from android| Left for Competitor |
|3    |Price was to expensive   | Pricing Issue       |
|4    |Support issue            | Support Issue       |
+-----+-------------------------+---------------------+

What's the best way to build an efficient function to do this in pyspark?

Upvotes: 0

Views: 1765

Answers (2)

blackbishop
blackbishop

Reputation: 32700

You can use when expressions to check whether the column reason matches the dict keys. You can dynamically generate the when expressions using python functools.reduce function by passing the list myDict.keys():

from functools import reduce
from pyspark.sql import functions as F

df2 = df.withColumn(
    "new_reason",
    reduce(
        lambda c, k: c.when(F.lower(F.col("reason")).rlike(rf"\b{k.lower()}\b"), myDict[k]),
        myDict.keys(),
        F
    )
)

df2.show(truncate=False)
#+---+-------------------------+-------------------+
#|id |reason                   |new_reason         |
#+---+-------------------------+-------------------+
#|1  |Needed better Support    |Support Issue      |
#|2  |Better value from android|Left for Competitor|
#|3  |Price was to expensive   |Pricing Issue      |
#|4  |Support problems         |Support Issue      |
#+---+-------------------------+-------------------+

Upvotes: 2

mck
mck

Reputation: 42392

You can create a keywords dataframe, and join to the original dataframe using an rlike condition. I added \\\\b before and after the keywords so that only words between word boundaries will be matched, and there won't be partial word matches (e.g. "pineapple" matching "apple").

import pyspark.sql.functions as F

keywords = spark.createDataFrame([[k,v] for (k,v) in myDict.items()]).toDF('key', 'new_reason')

result = df.join(
    keywords, 
    F.expr("lower(reason) rlike '\\\\b' || lower(key) || '\\\\b'"), 
    'left'
).drop('key')

result.show(truncate=False)
+---+-------------------------+-------------------+
|id |reason                   |new_reason         |
+---+-------------------------+-------------------+
|1  |Needed better Support    |Support Issue      |
|2  |Better value from android|Left for Competitor|
|3  |Price was to expensive   |Pricing Issue      |
|4  |Support problems         |Support Issue      |
+---+-------------------------+-------------------+

Upvotes: 0

Related Questions