Reputation: 307
I have a data frame and a function that I want to run on every cell in my data frame:
def foo(x):
# does stuff to x
return x
foo_udf = udf(lambda x: foo(x), StringType())
df = df.withColumn("col1", foo_udf(col("col1")))
.withColumn("col2", foo_udf(col("col2")))
.withColumn("col3", foo_udf(col("col3")))
It simply modifies the data passed in and returns a new value to replace the passed in value.
However, there may be instances where an error will occur, for these instances, I have another column col4
which will store a boolean of whether or not the udf failed for that row.
My issue is that when this occurs, I have no way of accessing col4
for that given row.
Upvotes: 1
Views: 822
Reputation: 14895
You can only return/change a single column from an UDF. However, this column can be a StructType, containing the payload and an error flag. Then you can "unpack" the struct column into two (or more) normal columns.
from pyspark.sql import functions as F
from pyspark.sql import types as T
#some testdata
data = [['A', 4],
['B', 2],
['C', 5]]
df=spark.createDataFrame(data, ["id", "col1"])
#the udf
def foo(x):
if x == 5:
error=True
else:
error=False
return [x, error]
foo_udf = F.udf(lambda x: foo(x), returnType = T.StructType([
T.StructField("x", T.StringType(), False),
T.StructField("error", T.BooleanType(), False)
]))
#calling the udf and unpacking the return values
df.withColumn("col1", foo_udf("col1")) \
.withColumn("error", F.col("col1.error")) \
.withColumn("col1", F.col("col1.x")) \
.show()
Output:
+---+----+-----+
| id|col1|error|
+---+----+-----+
| A| 4|false|
| B| 2|false|
| C| 5| true|
+---+----+-----+
Upvotes: 1
Reputation: 687
You can do this on a partition level with mapPartition. I will use Fugue which will provide an easier interface to bring this to Spark.
First some setup:
from typing import List, Dict, Any, Iterable
import pandas as pd
def foo(x):
if x == "E":
raise ValueError()
return x + "_ran"
def logic(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
for row in df:
try:
x = foo(row["col1"])
y = foo(row["col2"])
z = foo(row["col3"])
# if it reaches here, we can update all
row["col1"] = x
row["col2"] = y
row["col3"] = z
row["col4"] = False
except:
row["col4"] = True
return df
foo()
is your original function and logic()
is a wrapper to only update the columns if every foo()
call is successful. Annotating the function will guide Fugue to apply conversions. From here we can use Fugue's transform()
to test on Pandas.
df = pd.DataFrame({"col1": ["A", "B", "C"], "col2": ["A", "B", "C"], "col3": ["D", "E", "F"]})
from fugue import transform
transform(df, logic, schema="*, col4:boolean")
The schema is a requirement for Spark operations. This is just a minimal expression and then Fugue handles it, and then we get a result:
col1 col2 col3 col4
A_ran A_ran D_ran False
B B E True
C_ran C_ran F_ran False
so we can bring it to Spark. We just need to supply a SparkSession.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df)
transform(sdf, logic, schema="*, col4:boolean", engine=spark).show()
Upvotes: 2