TheShield
TheShield

Reputation: 307

Modify Different Pyspark Column on Exception in UDF

I have a data frame and a function that I want to run on every cell in my data frame:

def foo(x):
    # does stuff to x
    return x

foo_udf = udf(lambda x: foo(x), StringType())
df = df.withColumn("col1", foo_udf(col("col1")))
.withColumn("col2", foo_udf(col("col2")))
.withColumn("col3", foo_udf(col("col3")))

It simply modifies the data passed in and returns a new value to replace the passed in value.

However, there may be instances where an error will occur, for these instances, I have another column col4 which will store a boolean of whether or not the udf failed for that row.

My issue is that when this occurs, I have no way of accessing col4 for that given row.

Upvotes: 1

Views: 822

Answers (2)

werner
werner

Reputation: 14895

You can only return/change a single column from an UDF. However, this column can be a StructType, containing the payload and an error flag. Then you can "unpack" the struct column into two (or more) normal columns.

from pyspark.sql import functions as F
from pyspark.sql import types as T

#some testdata
data = [['A', 4],
    ['B', 2],
    ['C', 5]]
df=spark.createDataFrame(data, ["id", "col1"])

#the udf
def foo(x):
    if x == 5:
        error=True
    else:
        error=False
    return [x, error]

foo_udf = F.udf(lambda x: foo(x), returnType = T.StructType([
    T.StructField("x", T.StringType(), False),
    T.StructField("error", T.BooleanType(), False)
]))

#calling the udf and unpacking the return values
df.withColumn("col1", foo_udf("col1")) \
    .withColumn("error", F.col("col1.error")) \
    .withColumn("col1", F.col("col1.x")) \
    .show()

Output:

+---+----+-----+
| id|col1|error|
+---+----+-----+
|  A|   4|false|
|  B|   2|false|
|  C|   5| true|
+---+----+-----+

Upvotes: 1

Kevin Kho
Kevin Kho

Reputation: 687

You can do this on a partition level with mapPartition. I will use Fugue which will provide an easier interface to bring this to Spark.

First some setup:

from typing import List, Dict, Any, Iterable
import pandas as pd

def foo(x):
    if x == "E":
        raise ValueError()
    return x + "_ran"

def logic(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
    for row in df:
        try:
            x = foo(row["col1"])
            y = foo(row["col2"])
            z = foo(row["col3"])
            # if it reaches here, we can update all
            row["col1"] = x
            row["col2"] = y
            row["col3"] = z
            row["col4"] = False
        except:
            row["col4"] = True
    return df

foo() is your original function and logic() is a wrapper to only update the columns if every foo() call is successful. Annotating the function will guide Fugue to apply conversions. From here we can use Fugue's transform() to test on Pandas.

df = pd.DataFrame({"col1": ["A", "B", "C"], "col2": ["A", "B", "C"], "col3": ["D", "E", "F"]})

from fugue import transform
transform(df, logic, schema="*, col4:boolean")

The schema is a requirement for Spark operations. This is just a minimal expression and then Fugue handles it, and then we get a result:

col1    col2    col3    col4
A_ran   A_ran   D_ran   False
B       B       E       True
C_ran   C_ran   F_ran   False

so we can bring it to Spark. We just need to supply a SparkSession.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df)
transform(sdf, logic, schema="*, col4:boolean", engine=spark).show()

Upvotes: 2

Related Questions