Reputation: 1247
I am using Pandas UDF on Pyspark.
I have a main file __main_.py with:
from pyspark.sql import SparkSession
from run_udf import compute
def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = compute(df)
df.show()
spark.stop()
if __name__ == "__main__":
main()
And a run_udf.py file that contains my UDF function and another function (that multiplies a single variable by 2):
from pyspark.sql.functions import pandas_udf, PandasUDFType
def multi_by_2(x):
return 2 * x
def compute(df):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
By running main.py I get the following error: "No module named 'run_udf'". In this configuration, subtract_mean() does not seem to access the function multi_by_2(). I found 2 ways but don't know if it follows best practice standards:
Method 1: (move the function inside compute - not ideal as I will have the copy the function each time I use another pandas_udf() function - we loose the concept of 'reusable' function).
def compute(df):
def multi_by_2(x):
return 2 * x
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
Method 2: Pass the multiplying function as parameter of compute.
__main_.py
from pyspark.sql import SparkSession
from run_udf import compute
def multi_by_2(x):
return 2 * x
def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = compute(df, multi_by_2)
df.show()
spark.stop()
if __name__ == "__main__":
main()
run_udf.py from pyspark.sql.functions import pandas_udf, PandasUDFType
def compute(df, multi_by_2):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())
df = df.groupby("id").apply(subtract_mean)
return df
The two solutions I found seem to be a bit hacky. Is there any better way to tackle this issue?
Upvotes: 3
Views: 1354
Reputation: 33
I know this reply comes a while after you've posted the question but I hope it can still be helpful!
What is the reason you want to wrap this in a nested function? Also, calling a function with a spark dataframe as the argument is not commonly done as far as I am aware, so maybe you could try something like the following for your main script:
from pyspark.sql import SparkSession
from run_udf import substract_mean_udf
def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df =df.groupby("id").apply(subtract_mean_udf)
df.show()
spark.stop()
if __name__ == "__main__":
main()
And the following for the run_udf.py script:
from pyspark.sql.functions import pandas_udf, PandasUDFType
def multi_by_2(x):
return 2 * x
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean_udf(pdf):
# pdf is a pandas.DataFrame
return pdf.assign(v=multi_by_2(pdf.v) - pdf.v.mean())
Most of this information is taken from the a Databricks notebook on Pandas UDFs.
You could probably also get away with
return pdf.assign(v=pdf.v*2 - pdf.v.mean())
but I haven't tested that so I am not 100% sure.
Upvotes: 1