Liky
Liky

Reputation: 1247

Functions not recognised inside Pandas UDF function

I am using Pandas UDF on Pyspark.

I have a main file __main_.py with:

from pyspark.sql import SparkSession
from run_udf import compute


def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df = compute(df)
    df.show()
    spark.stop()


if __name__ == "__main__":
    main()

And a run_udf.py file that contains my UDF function and another function (that multiplies a single variable by 2):

from pyspark.sql.functions import pandas_udf, PandasUDFType


def multi_by_2(x):
    return 2 * x


def compute(df):

    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)

    return df

By running main.py I get the following error: "No module named 'run_udf'". In this configuration, subtract_mean() does not seem to access the function multi_by_2(). I found 2 ways but don't know if it follows best practice standards:

Method 1: (move the function inside compute - not ideal as I will have the copy the function each time I use another pandas_udf() function - we loose the concept of 'reusable' function).

def compute(df):
    def multi_by_2(x):
        return 2 * x
    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)


    return df

Method 2: Pass the multiplying function as parameter of compute.

__main_.py

from pyspark.sql import SparkSession
from run_udf import compute
def multi_by_2(x):
    return 2 * x

def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df = compute(df, multi_by_2)
    df.show()
    spark.stop()


if __name__ == "__main__":
    main()

run_udf.py from pyspark.sql.functions import pandas_udf, PandasUDFType

def compute(df, multi_by_2):
    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=multi_by_2(v) - v.mean())

    df = df.groupby("id").apply(subtract_mean)


    return df

The two solutions I found seem to be a bit hacky. Is there any better way to tackle this issue?

Upvotes: 3

Views: 1354

Answers (1)

Robin G.
Robin G.

Reputation: 33

I know this reply comes a while after you've posted the question but I hope it can still be helpful!

What is the reason you want to wrap this in a nested function? Also, calling a function with a spark dataframe as the argument is not commonly done as far as I am aware, so maybe you could try something like the following for your main script:

from pyspark.sql import SparkSession
from run_udf import substract_mean_udf

def main():
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))
    df =df.groupby("id").apply(subtract_mean_udf)
    df.show()
    spark.stop()

if __name__ == "__main__":
    main()

And the following for the run_udf.py script:

from pyspark.sql.functions import pandas_udf, PandasUDFType

def multi_by_2(x):
    return 2 * x

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean_udf(pdf):
    # pdf is a pandas.DataFrame
    return pdf.assign(v=multi_by_2(pdf.v) - pdf.v.mean())

Most of this information is taken from the a Databricks notebook on Pandas UDFs.

You could probably also get away with

return pdf.assign(v=pdf.v*2 - pdf.v.mean())

but I haven't tested that so I am not 100% sure.

Upvotes: 1

Related Questions