itscarlayall
itscarlayall

Reputation: 166

Create a column based on values in a dictionary

I have a dictionary like this:

sample_dict = {
    "A": ["aaaa\.com", "aaaa\.es"],
    "B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
    "C": ["ccccc\.com"],
    # many more entries here
}

I would like to add a column in a Spark DataFrame which performs the following operation:

(
    df
    .withColumn(
        "new_col",
        F.when(
            (F.col("filter_col").rlike("aaaa\.com")) | 
            (F.col("filter_col").rlike("aaaa\.es")),
            F.lit("A") 
        )
        .when(
            (F.col("filter_col").rlike("bbbb\.com")) | 
            (F.col("filter_col").rlike("bbbb\.es")) | 
            (F.col("filter_col").rlike("bbbb\.net")),
            F.lit("B") 
        )
        .when(
            (F.col("filter_col").rlike("cccc\.com")),
            F.lit("C") 
        )
        .otherwise(None)
    )
)

But, of course, I would like it to be dynamical, so that I may add new components to my dictionary and the column would automatically consider them and add a new category based on the rules.

Is this possible?

Upvotes: 0

Views: 706

Answers (2)

Nithish
Nithish

Reputation: 3232

You can construct the column expression by iterating over the dict and assign this expression to your withColumn call.


from pyspark.sql import functions as F

sample_dict = {
    "A": ["aaaa\.com", "aaaa\.es"],
    "B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
    "C": ["ccccc\.com"],
    # many more entries here
}

data = [("aaaa.com", ), ("aaaa.es", ), ("bbbb.com", ), ("zzzz.com", ), ]

df = spark.createDataFrame(data, ("filter_col", ))

column_expression = F
for k, conditions in sample_dict.items():
    condition_expression = F.col("filter_col").rlike(conditions[0])
    for condition in conditions[1:]:
        condition_expression |= F.col("filter_col").rlike(condition)
    column_expression = column_expression.when(condition_expression, F.lit(k))

df.withColumn("new_col", column_expression.otherwise(None)).show()

Output

# column_expression Equivalent to writing the expression by hand
Column<'CASE WHEN (RLIKE(filter_col, aaaa\.com) OR RLIKE(filter_col, aaaa\.es)) THEN A WHEN ((RLIKE(filter_col, bbbb\.com) OR RLIKE(filter_col, bbbb\.es)) OR RLIKE(filter_col, bbbb\.net)) THEN B WHEN RLIKE(filter_col, ccccc\.com) THEN C END'>


## Df with expression applied
+----------+-------+
|filter_col|new_col|
+----------+-------+
|  aaaa.com|      A|
|   aaaa.es|      A|
|  bbbb.com|      B|
|  zzzz.com|   null|
+----------+-------+

Upvotes: 3

Til Piffl
Til Piffl

Reputation: 645

If you could alter you column such that you could look for exact matches you could use df.replace():

from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(filter_col='aaa.de'),
    Row(filter_col='aaa.es'),
    Row(filter_col='bbb.de'),
    Row(filter_col='bbb.es'),
])

d = {
    'aaa.de': 'A',
    'aaa.es': 'A',
    'bbb.de': 'B',
    'bbb.es': 'B',
}

(
    df
    .withColumn('new_col', F.col('filter_col'))
    .withColumn('new_col', F.when(F.col('new_col').isin(list(d.keys())), F.col('new_col')))
    .replace(d, None, subset='new_col')
    .show()
)

# Output:
+----------+-------+
|filter_col|new_col|
+----------+-------+
|    aaa.de|      A|
|    aaa.es|      A|
|    bbb.de|      B|
|    bbb.es|      B|
|       foo|   null|
+----------+-------+

There might be a more performant way to replace values not mentioned in your dictionary with "None" (your "otherwise" condition).


Update:

If the reformatting is not possible, you would have to iterate through your dict:

from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(filter_col='aaa.de/foo'),
    Row(filter_col='aaa.es/foo'),
    Row(filter_col='bbb.de/foo'),
    Row(filter_col='bbb.es/foo'),
    Row(filter_col='foo'),
])

d = {
    'aaa\.de': 'A',
    'aaa\.es': 'A',
    'bbb\.de': 'B',
    'bbb\.es': 'B',
}

df = df.withColumn('new_col', F.lit(None).cast('string'))
for k,v in d.items():
    df = df.withColumn('new_col', F.when(F.col('filter_col').rlike(k), v).otherwise(F.col('new_col')))
    
df.show()
# Output
+----------+-------+
|filter_col|new_col|
+----------+-------+
|aaa.de/foo|      A|
|aaa.es/foo|      A|
|bbb.de/foo|      B|
|bbb.es/foo|      B|
|       foo|   null|
+----------+-------+

Upvotes: 1

Related Questions