Reputation: 31
I have a simple spark dataframe that has two columns, both strings; One called id
and the other called name
. I also have a Python function called string_replacement
that does some string manipulation. I've defined a wrapper UDF that encompasses string_replacement
and applies to every row of the data frame. Only the name
column is passed to the string manipulation function. Here is the code
# Import libraries
from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark.sql.types import *
# Create Example Dataframe
row1 = Row(id='123456', name='Computer Science')
df = spark.createDataFrame([row1])
# Print the dataframe
df.show()
# Define function that does some string operations
def string_replacement(input_string):
string=input_string
string=string.replace('Computer', 'Computer x')
string=string.replace('Science', 'Science x')
return string
# Define wrapper function to turn into UFD
def wrapper_func(row):
temp=row[1]
temp=string_replacement(temp)
row[1]=temp
return row
# Create the schema for the resulting data frame
output_schema = StructType([StructField('id', StringType(), True),
StructField('name', StringType(), True)])
# UDF to apply the wrapper function to the dataframe
new_udf=f.udf(lambda z: wrapper_func(z), output_schema)
cols=df.columns
new_df=df.select(new_udf(f.array(cols)).alias('results')).select(f.col('results.*'))
new_df.show(truncate = False)
The function takes the word Computer
and turns it into Computer x
. It does the same for the word Science
.
The original dataframe looks like this
+------+----------------+
| id| name|
+------+----------------+
|123456|Computer Science|
+------+----------------+
After applying the function, it looks like this
+------+------------------------+
|id |name |
+------+------------------------+
|123456|Computer x x Science x x|
+------+------------------------+
As you can tell by the x x
s, it has run the function twice. The second time on the output of the first run. How can I avoid this behavior?
Interestingly, if I don't explode the resultant dataframe, it looks fine:
new_df=df.select(new_udf(f.array(cols)).alias('results'))
gives you
+-----------------------------+
|results |
+-----------------------------+
|[123456,Computer x Science x]|
+-----------------------------+
Upvotes: 3
Views: 1290
Reputation: 31
Thanks cylim. This also seems to work but your approach is clearer.
def string_replacement(string1, string2):
string2=string2.replace('Computer', 'Computer x')
string2=string2.replace('Science', 'Science x')
return string1, string2
output_schema = StructType([StructField('id', StringType(), True), StructField('name', StringType(), True)])
new_udf=f.udf(string_replacement, output_schema)
cols=df.columns
df.select( new_udf(f.col('id'), f.col('name')).alias('results')).select(f.col('results.*')).show(truncate = False)
Upvotes: 0
Reputation: 542
Using star expansion seems to result in running UDF once for each expanded element, as can be seen here.
df.select(new_udf(F.array(cols)).alias('results')).select(F.col('results.*')).explain()
# == Physical Plan ==
# *(1) Project [pythonUDF1#109.id AS id#104, pythonUDF1#109.name AS name#105]
# +- BatchEvalPython [<lambda>(array(id#0, name#1)), <lambda>(array(id#0, name#1))], [id#0, name#1, pythonUDF0#108, pythonUDF1#109]
# +- Scan ExistingRDD[id#0,name#1]
If you want to keep your current code structure, you can solve the problem by wrapping it in an array and doing an explode.
df.select(F.explode(F.array(new_udf(F.array(cols)))).alias('results')).select(F.col('results.*')).show(truncate=False)
# +------+--------------------+
# |id |name |
# +------+--------------------+
# |123456|Computer x Science x|
# +------+--------------------+
Depending on your use case, the code is more readable if you can re-implement the UDF in this way, i.e. processing only a specific column per row rather than the whole row.
def rep_str(string):
res = string
res = res.replace('Computer', 'Computer x')
res = res.replace('Science', 'Science x')
return res
rep_str_udf = F.udf(lambda s: rep_str(s), StringType())
df.withColumn('new_name', rep_str_udf(df.name)).show()
# +------+----------------+--------------------+
# | id| name| new_name|
# +------+----------------+--------------------+
# |123456|Computer Science|Computer x Science x|
# +------+----------------+--------------------+
Upvotes: 1