Reputation: 103
I am trying to figure out how to use self
in PandasUDF.GroupBy.Apply
in a Class method in Python and also pass arguments in it. I have tried a lot of different ways but couldn't make it work. I also searched the internet extensively looking for an example of PandasUDF which is used inside a class with self and arguments but could not find anything like that. I know how to do all of the before mentioned things with Pandas.GroupBy.Apply
.
The only way through which i could make it work was by declaring it static-method
class Train:
return_type = StructType([
StructField("div_nbr", FloatType()),
StructField("store_nbr", FloatType()),
StructField("model_str", BinaryType())
])
function_type = PandasUDFType.GROUPED_MAP
def __init__(self):
............
def run_train(self):
output = sp_df.groupby(['A', 'B']).apply(self.model_train)
output.show(10)
@staticmethod
@pandas_udf(return_type, function_type)
def model_train(pd_df):
features_name = ['days_into_year', 'months_into_year', 'minutes_into_day', 'hour_of_day', 'recency']
X = pd_df[features_name].copy()
Y = pd.DataFrame(pd_df['trans_type_value']).copy()
estimator_1 = XGBRegressor(max_depth=3, learning_rate=0.1, n_estimators=300, verbosity=1,
objective='reg:squarederror', booster='gbtree', n_jobs=-1, gamma=0,
min_child_weight=5, max_delta_step=0, subsample=0.6, colsample_bytree=0.8,
colsample_bylevel=1, colsample_bynode=1, reg_alpha=0, reg_lambda=1,
scale_pos_weight=1, base_score=0.5, random_state=1234, missing=None,
importance_type='gain')
estimator_1.fit(X, Y)
df_to_return = pd_df[['div_nbr', 'store_nbr']].drop_duplicates().copy()
df_to_return['model_str'] = pickle.dumps(estimator_1)
return df_to_return
What i would like to achieve in reality is that declare return_type
and function_type
, features_name
in __init__()
, then use it in PandasUDF, also pass parameters to be used inside the function when doing PandasUDF.GroupBy.Apply
If anyone could help me out, I would highly appreciate that. I am a bit newbie to PySpark.
Upvotes: 8
Views: 2661
Reputation: 725
Pandas UDF Lifecycle:
The Problem:
When working with extra data, such as a class's self
, the pandas udf still needs to serialize and send that data. Serializing complex python objects like classes is not in PyArrow's capabilities, so you must either create a wrapper function and reference only specific serializable python types within the pandas_udf or 2) use a @staticmethod
to negate the need for self
.
1 - Pandas UDF with a Parameter in a Class: wrap the method with a function and create a local variable within that wrapper - src. Note that all variables that are referenced within the pandas_udf must be supported by PyArrow. Most python types are supported, but classes are not.
class my_class:
def __init__(self, s):
self.s = s
def wrapper_add_s(self, column):
local_s = self.s # create a local copy of s to be referenced by udf
@pandas_udf("string")
def add_s(column: pd.Series) -> pd.Series:
return column + f'_{local_s}'
return add_s(column)
def add_col(df):
return df.withColumn("Name", self.wrapper_add_s("Name"))
c = my_class(s='hi')
c.add_col(df)
2 - Pandas UDF without a Parameter in a Class: use the @staticmethod
decorator
class my_class:
def __init__(self, s):
pass
@staticmethod
@pandas_udf("string")
def add_s(column: pd.Series) -> pd.Series:
return column + ' static string'
def add_col(df):
return df.withColumn("Name", self.wrapper_add_s("Name"))
c = my_class()
c.add_col(df)
If you're looking a simple structure to pass a parameter to a pandas_udf outside of a class, use this... - src
def wrapper_add_s(column, s):
@pandas_udf("string")
def add_s(column: pd.Series) -> pd.Series:
return column + f'_{s}'
return add_s(column)
df = df.withColumn("Name", wrapper_add_s("Name", s='hi'))
Upvotes: 5