Reputation: 903
I have multiple UDFs in a codebase I inherited. Is there any way to remove and implement without the UDF? I'm running on 1.3B rows, so every bit helps. I considered using apply on a function, but could not get it to work.
# UDF to calculate seasonality, trend, and residual at the same time
@F.pandas_udf(ArrayType(StringType()))
def seasonality_trend_residual_udf(v: pd.Series) -> any:
period_length = 52 # Set your desired period length here
try:
# Decompose the time series using the seasonal_decompose function
decompose_output = seasonal_decompose(v, model='additive', period=period_length, extrapolate_trend='freq')
# Get the last values for each component
array_out = [
str(decompose_output.seasonal.values[-1] if len(decompose_output.seasonal) > 0 else None),
str(decompose_output.trend.values[-1] if len(decompose_output.trend) > 0 else None),
str(decompose_output.resid.values[-1] if len(decompose_output.resid) > 0 else None)
]
except Exception:
# If there's an error in decomposition, return a dictionary with empty string values
array_out = [str(None), str(None), str(None)]
return array_out
# Function to apply seasonality, trend, and residual
def apply_seasonality_trend(df: DataFrame, date_col: str, value_col: str, partition_cols: list, period_length: int) -> DataFrame:
window_spec = Window.partitionBy(partition_cols).orderBy(date_col).rowsBetween(-2 * period_length, 0) # Adjust based on your period length
df = df.withColumn(f'udf_output', seasonality_trend_residual_udf(F.col(value_col)).over(window_spec))
df = (df
.withColumn('category_fcst_volume_sales_seasonality', F.col('udf_output').getItem(0))
.withColumn('category_fcst_volume_sales_trend', F.col('udf_output').getItem(1))
.withColumn('category_fcst_volume_sales_residual', F.col('udf_output').getItem(2))
.drop('udf_output'))
return df
Upvotes: 1
Views: 62