Reputation: 166
I have a dictionary like this:
sample_dict = {
"A": ["aaaa\.com", "aaaa\.es"],
"B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
"C": ["ccccc\.com"],
# many more entries here
}
I would like to add a column in a Spark DataFrame which performs the following operation:
(
df
.withColumn(
"new_col",
F.when(
(F.col("filter_col").rlike("aaaa\.com")) |
(F.col("filter_col").rlike("aaaa\.es")),
F.lit("A")
)
.when(
(F.col("filter_col").rlike("bbbb\.com")) |
(F.col("filter_col").rlike("bbbb\.es")) |
(F.col("filter_col").rlike("bbbb\.net")),
F.lit("B")
)
.when(
(F.col("filter_col").rlike("cccc\.com")),
F.lit("C")
)
.otherwise(None)
)
)
But, of course, I would like it to be dynamical, so that I may add new components to my dictionary and the column would automatically consider them and add a new category based on the rules.
Is this possible?
Upvotes: 0
Views: 706
Reputation: 3232
You can construct the column expression by iterating over the dict
and assign this expression to your withColumn
call.
from pyspark.sql import functions as F
sample_dict = {
"A": ["aaaa\.com", "aaaa\.es"],
"B": ["bbbb\.com", "bbbb\.es", "bbbb\.net"],
"C": ["ccccc\.com"],
# many more entries here
}
data = [("aaaa.com", ), ("aaaa.es", ), ("bbbb.com", ), ("zzzz.com", ), ]
df = spark.createDataFrame(data, ("filter_col", ))
column_expression = F
for k, conditions in sample_dict.items():
condition_expression = F.col("filter_col").rlike(conditions[0])
for condition in conditions[1:]:
condition_expression |= F.col("filter_col").rlike(condition)
column_expression = column_expression.when(condition_expression, F.lit(k))
df.withColumn("new_col", column_expression.otherwise(None)).show()
# column_expression Equivalent to writing the expression by hand
Column<'CASE WHEN (RLIKE(filter_col, aaaa\.com) OR RLIKE(filter_col, aaaa\.es)) THEN A WHEN ((RLIKE(filter_col, bbbb\.com) OR RLIKE(filter_col, bbbb\.es)) OR RLIKE(filter_col, bbbb\.net)) THEN B WHEN RLIKE(filter_col, ccccc\.com) THEN C END'>
## Df with expression applied
+----------+-------+
|filter_col|new_col|
+----------+-------+
| aaaa.com| A|
| aaaa.es| A|
| bbbb.com| B|
| zzzz.com| null|
+----------+-------+
Upvotes: 3
Reputation: 645
If you could alter you column such that you could look for exact matches you could use df.replace()
:
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(filter_col='aaa.de'),
Row(filter_col='aaa.es'),
Row(filter_col='bbb.de'),
Row(filter_col='bbb.es'),
])
d = {
'aaa.de': 'A',
'aaa.es': 'A',
'bbb.de': 'B',
'bbb.es': 'B',
}
(
df
.withColumn('new_col', F.col('filter_col'))
.withColumn('new_col', F.when(F.col('new_col').isin(list(d.keys())), F.col('new_col')))
.replace(d, None, subset='new_col')
.show()
)
# Output:
+----------+-------+
|filter_col|new_col|
+----------+-------+
| aaa.de| A|
| aaa.es| A|
| bbb.de| B|
| bbb.es| B|
| foo| null|
+----------+-------+
There might be a more performant way to replace values not mentioned in your dictionary with "None" (your "otherwise" condition).
If the reformatting is not possible, you would have to iterate through your dict:
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(filter_col='aaa.de/foo'),
Row(filter_col='aaa.es/foo'),
Row(filter_col='bbb.de/foo'),
Row(filter_col='bbb.es/foo'),
Row(filter_col='foo'),
])
d = {
'aaa\.de': 'A',
'aaa\.es': 'A',
'bbb\.de': 'B',
'bbb\.es': 'B',
}
df = df.withColumn('new_col', F.lit(None).cast('string'))
for k,v in d.items():
df = df.withColumn('new_col', F.when(F.col('filter_col').rlike(k), v).otherwise(F.col('new_col')))
df.show()
# Output
+----------+-------+
|filter_col|new_col|
+----------+-------+
|aaa.de/foo| A|
|aaa.es/foo| A|
|bbb.de/foo| B|
|bbb.es/foo| B|
| foo| null|
+----------+-------+
Upvotes: 1