Reputation: 11
I would like to use a pandas UDF to speed up a user defined function. The type of pandas udf I am interested in the one that gets a pandas DataFrame as input and returns a Pandas DataFrame (the PandasUDFType.GROUPED_MAP).
However it seems that these pandas
UDFs must be inserted in a groupby().apply()
framework, while in my case I simply would like
to apply the pandas UDF to every partition of the Pyspark Dataframe, with the idea of transforming each partition into a local Pandas Dataframe in each executor. In fact, I would like to avoid any type of groupby because this would lead to some data reshuffling.
Is there a way to achieve this, maybe by specifically saying that the groupby should be done by partition or something similar?
Upvotes: 1
Views: 1000
Reputation: 21
It is actually the way of telling how to "slice" your pyspark dataframe so each slice is further converted to pandas df and processed by your pUDF function. It is analogical to an usage of a for loop when you would iterate over the field (or fields) submitted in groupby clause.
For example, I have created a FC pyspark dataframe and pUDF with a hierarchical forecasting model on L4 level while returning predicted values at the end. The FC dataframe contains also columns L1-L3 of company product hierarchy above L4, where L1 it Total. So I can submit L1 into groupby clause and 1 model will be run or I can submit L2 and few models will run in paralel (if multiworker cluster). Or I can submit a combination of L1 + L2 if Im not sure that hierarchy is not crossed for some cases as below, but always the output will be a forecast on L4 as set in my pUDF (no groupby of values before processing will happen).
result_schema = StructType([ \
StructField("xy",StringType(),True), \
#...define output fields
])
def pandas_udf(df_pandas: pd.DataFrame) -> pd.DataFrame:
#..use df_pandas to model
return df_forecast
result_spark_df = (
input_spark_df
.groupBy(['L1','L2'])
.applyInPandas(pandas_udf, schema = result_schema)
.withColumn('run_time', current_ts )
)
Upvotes: 0