Reputation: 682
I'm using the pandas API on Spark on Azure Databricks, and I have a very big pyspark.pandas.DataFrame
, call data
, containing sales information (as time series) for different products that can be identified by the sku_id
column and the sales information can be identified with target
column. I have a function forecast
that implements a forecast for each sku_id
. I thought it would be as easy as data.groupby("sku_id").target.apply(forecast)
, but the time series that enter the forecast
function are partitioned, i.e forecast
is not seeing the complete time series for each sku. I understand that some functions like len
can operate on batches and then merge the results in a easy way (
adding the results of each partition), forecast
is not the case, It needs the whole time series for each sku. I checked that the time series are being partitioned raising an error if forecast
is being evaluated with a short series, like this
def forecast(y):
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
I'm sure that for each sku_id
the observations are larger than 100 (much more), but the apply
fails with the NotImplementedError
, so the time series are being partitioned
So I want to understand how to apply a method like forecast
on a groupby operation without partitioning the time series. I have not found documentation about it, and I want to know if this could be done or if it's a good practice.
Or maybe the approach is other: how to partition my data frame so that each series of sku_id
is in the same partition?
EDIT
It works for
applyInPandas
for spark Data Framedef forecast(y) -> float:
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
Why does the latter happen? The documentation says
this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in func, for instance, as below
Probably it tries to infer the return types to define the schema with a subsample at first instance?
Upvotes: 2
Views: 855
Reputation: 5125
If you look at the code itself it provides some hints. That code eventually leads you down the path to where it infers schema.. To me this shows where your code could fail. This is because it would at that time raise your error.
first = rdd.first()
Diving a little further, it also shows how you could turn off this behaviour and instead use a sampled approach: spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled
Disabling this setting would give you proof if that was in fact the issue. If you selected a sample size that was at least bigger than your required len(100)
You could also likely fix the issue by removing the raise
. Which causes the issue. You could still track any sets under 100 by using an accumulator. These are made to track this type of data oddity and might be a better call if you don't want to blow up things but still track counts of odd situations like less than 100 time series events.
Upvotes: 1
Reputation: 2426
Maybe if you repartition and cache your Spark DF?:
data = data\
.repartition(sc.defaultParallelism, ['sku_id'])\
.cache()
And don't use .toPandas()
but work with a pandas_udf
:
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast(y):
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
#Then call the UDF to build the forecasts:
results = (
data
.groupBy('sku_id')
.apply(forecast_store_item)
)
This was extracted from here. Maybe gives you some good insight
Upvotes: 1