devarsh patel
devarsh patel

Reputation: 103

How to use Pandas UDF in Class

I am trying to figure out how to use self in PandasUDF.GroupBy.Apply in a Class method in Python and also pass arguments in it. I have tried a lot of different ways but couldn't make it work. I also searched the internet extensively looking for an example of PandasUDF which is used inside a class with self and arguments but could not find anything like that. I know how to do all of the before mentioned things with Pandas.GroupBy.Apply.

The only way through which i could make it work was by declaring it static-method

class Train:
    return_type = StructType([
        StructField("div_nbr", FloatType()),
        StructField("store_nbr", FloatType()),
        StructField("model_str", BinaryType())
    ])
    function_type = PandasUDFType.GROUPED_MAP

    def __init__(self):
       ............

    def run_train(self):
         output = sp_df.groupby(['A', 'B']).apply(self.model_train)
         output.show(10)

    @staticmethod
    @pandas_udf(return_type, function_type)
    def model_train(pd_df):
        features_name = ['days_into_year', 'months_into_year', 'minutes_into_day', 'hour_of_day', 'recency']

        X = pd_df[features_name].copy()
        Y = pd.DataFrame(pd_df['trans_type_value']).copy()

        estimator_1 = XGBRegressor(max_depth=3, learning_rate=0.1, n_estimators=300, verbosity=1,
                                   objective='reg:squarederror', booster='gbtree', n_jobs=-1, gamma=0,
                                   min_child_weight=5, max_delta_step=0, subsample=0.6, colsample_bytree=0.8,
                                   colsample_bylevel=1, colsample_bynode=1, reg_alpha=0, reg_lambda=1,
                                   scale_pos_weight=1, base_score=0.5, random_state=1234, missing=None,
                                   importance_type='gain')
        estimator_1.fit(X, Y)
        df_to_return = pd_df[['div_nbr', 'store_nbr']].drop_duplicates().copy()
        df_to_return['model_str'] = pickle.dumps(estimator_1)

        return df_to_return

What i would like to achieve in reality is that declare return_type and function_type, features_name in __init__(), then use it in PandasUDF, also pass parameters to be used inside the function when doing PandasUDF.GroupBy.Apply

If anyone could help me out, I would highly appreciate that. I am a bit newbie to PySpark.

Upvotes: 8

Views: 2661

Answers (1)

Michael Berk
Michael Berk

Reputation: 725

Background

Pandas UDF Lifecycle:

  1. A spark datatype (Column, DataFrame) is serialized into Arrow's Table format via PyArrow.
  2. That data is sent to python virtual environments (VM), which are created JIT, within each executor.
  3. Before reaching the python VM, the data is deserialized to a pandas Column/DataFrame and your pandas_udf code is run on that Column/DataFrame.
  4. The Pandas output is serialized back to Arrow's Table format.
  5. The python VM sends data back to the calling process.
  6. Before reaching the Spark environment, the Arrow Table is decoded back to a spark datatype.

The Problem:

When working with extra data, such as a class's self, the pandas udf still needs to serialize and send that data. Serializing complex python objects like classes is not in PyArrow's capabilities, so you must either create a wrapper function and reference only specific serializable python types within the pandas_udf or 2) use a @staticmethod to negate the need for self.

The Solutions

1 - Pandas UDF with a Parameter in a Class: wrap the method with a function and create a local variable within that wrapper - src. Note that all variables that are referenced within the pandas_udf must be supported by PyArrow. Most python types are supported, but classes are not.

class my_class:
  def __init__(self, s):
    self.s = s
  
  def wrapper_add_s(self, column):
    local_s = self.s # create a local copy of s to be referenced by udf

    @pandas_udf("string")
    def add_s(column: pd.Series) -> pd.Series:
      return column + f'_{local_s}'

    return add_s(column)


  def add_col(df):
    return df.withColumn("Name", self.wrapper_add_s("Name"))

c = my_class(s='hi')
c.add_col(df)

2 - Pandas UDF without a Parameter in a Class: use the @staticmethod decorator

class my_class:
  def __init__(self, s):
    pass

  @staticmethod
  @pandas_udf("string")
  def add_s(column: pd.Series) -> pd.Series:
    return column + ' static string'


  def add_col(df):
    return df.withColumn("Name", self.wrapper_add_s("Name"))

c = my_class()
c.add_col(df)

Not in a Class

If you're looking a simple structure to pass a parameter to a pandas_udf outside of a class, use this... - src

def wrapper_add_s(column, s):

  @pandas_udf("string")
  def add_s(column: pd.Series) -> pd.Series:
    return column + f'_{s}'

  return add_s(column)


df = df.withColumn("Name", wrapper_add_s("Name", s='hi'))

Upvotes: 5

Related Questions