mathfish
mathfish

Reputation: 194

Correct Way to Specify User-Defined Function in PySpark Pandas UDF

I am using pyspark 2.4.2, so the per the docs for this version one can do this to create a GROUPED_MAP:

from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))

@pandas_udf(returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

This works but you cannot call subtract_mean as a normal python function that is passed a pandas DataFrame. But if you do this, it will work:

def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

sub_spark = pandas_udf(f=subtract_mean, returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP)

df.groupby("id").apply(sub_spark).show()

Now you can call subtract_mean from python passing a pandas DataFrame. How does one do this using the annotation approach? It is not clear from the docs how to do this. What function is annotated and what function is given for the f parameter?

Upvotes: 0

Views: 1580

Answers (1)

mck
mck

Reputation: 42332

The two ways are equivalent for specifying an UDF. The decorator approach is just a neater way of doing things. The function that follows the decorator is passed as the f parameter.

As described in this answer, you can use subtract_mean.__wrapped__ to get back the original undecorated function. The second approach in your question is more readble though. Using __wrapped__ makes the code less readable. But if it's just for unit testing purposes, it should be fine.

Upvotes: 1

Related Questions