Reputation: 194
I am using pyspark 2.4.2, so the per the docs for this version one can do this to create a GROUPED_MAP:
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))
@pandas_udf(returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
This works but you cannot call subtract_mean
as a normal python function that is passed a pandas DataFrame. But if you do this, it will work:
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
sub_spark = pandas_udf(f=subtract_mean, returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP)
df.groupby("id").apply(sub_spark).show()
Now you can call subtract_mean
from python passing a pandas DataFrame. How does one do this using the annotation approach? It is not clear from the docs how to do this. What function is annotated and what function is given for the f
parameter?
Upvotes: 0
Views: 1580
Reputation: 42332
The two ways are equivalent for specifying an UDF. The decorator approach is just a neater way of doing things. The function that follows the decorator is passed as the f
parameter.
As described in this answer, you can use subtract_mean.__wrapped__
to get back the original undecorated function. The second approach in your question is more readble though. Using __wrapped__
makes the code less readable. But if it's just for unit testing purposes, it should be fine.
Upvotes: 1