Andrex
Andrex

Reputation: 682

Understanding pyspark apply on groupby

I'm using the pandas API on Spark on Azure Databricks, and I have a very big pyspark.pandas.DataFrame, call data, containing sales information (as time series) for different products that can be identified by the sku_id column and the sales information can be identified with target column. I have a function forecast that implements a forecast for each sku_id. I thought it would be as easy as data.groupby("sku_id").target.apply(forecast), but the time series that enter the forecast function are partitioned, i.e forecast is not seeing the complete time series for each sku. I understand that some functions like len can operate on batches and then merge the results in a easy way ( adding the results of each partition), forecast is not the case, It needs the whole time series for each sku. I checked that the time series are being partitioned raising an error if forecast is being evaluated with a short series, like this

def forecast(y):
    if len(y) <= 100:
        raise NotImplementedError("Serie too short")
    # Do the forecast...

I'm sure that for each sku_id the observations are larger than 100 (much more), but the apply fails with the NotImplementedError, so the time series are being partitioned

So I want to understand how to apply a method like forecast on a groupby operation without partitioning the time series. I have not found documentation about it, and I want to know if this could be done or if it's a good practice.

Or maybe the approach is other: how to partition my data frame so that each series of sku_id is in the same partition?

EDIT

It works for

def forecast(y) -> float:
    if len(y) <= 100:
        raise NotImplementedError("Serie too short")
    # Do the forecast...

Why does the latter happen? The documentation says

this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting. To avoid this, specify return type in func, for instance, as below

Probably it tries to infer the return types to define the schema with a subsample at first instance?

Upvotes: 2

Views: 855

Answers (2)

Matt Andruff
Matt Andruff

Reputation: 5125

If you look at the code itself it provides some hints. That code eventually leads you down the path to where it infers schema.. To me this shows where your code could fail. This is because it would at that time raise your error.

 first = rdd.first()

Diving a little further, it also shows how you could turn off this behaviour and instead use a sampled approach: spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled

Disabling this setting would give you proof if that was in fact the issue. If you selected a sample size that was at least bigger than your required len(100)

You could also likely fix the issue by removing the raise. Which causes the issue. You could still track any sets under 100 by using an accumulator. These are made to track this type of data oddity and might be a better call if you don't want to blow up things but still track counts of odd situations like less than 100 time series events.

Upvotes: 1

Luiz Viola
Luiz Viola

Reputation: 2426

Maybe if you repartition and cache your Spark DF?:

data = data\
            .repartition(sc.defaultParallelism, ['sku_id'])\
            .cache()

And don't use .toPandas() but work with a pandas_udf:

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast(y):
    if len(y) <= 100:
        raise NotImplementedError("Serie too short")
    # Do the forecast...

#Then call the UDF to build the forecasts:

results = (
  data
    .groupBy('sku_id')
    .apply(forecast_store_item)
    )

This was extracted from here. Maybe gives you some good insight

Upvotes: 1

Related Questions