user9722371
user9722371

Reputation: 31

How to run exponential weighted moving average in pyspark

I am trying to run exponential weighted moving average in PySpark using a Grouped Map Pandas UDF. It doesn't work though:

def ExpMA(myData):

    from pyspark.sql.functions import pandas_udf
    from pyspark.sql.functions import PandasUDFType
    from pyspark.sql import SQLContext 

    df = myData
    group_col = 'Name'
    sort_col = 'Date'

    schema = df.select(group_col, sort_col,'count').schema
    print(schema)

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ema(pdf):
        Model = pd.DataFrame(pdf.apply(lambda x: x['count'].ewm(span=5, min_periods=1).mean()))
        return Model

    data = df.groupby('Name').apply(ema)

    return data

I also tried running it without the Pandas udf, just writing the ewma equation in PySpark, but the problem there is that the ewma equation contains the lag of the current ewma.

Upvotes: 3

Views: 4673

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

First of all your Pandas code is incorrect. This just won't work, Spark or not

pdf.apply(lambda x: x['count'].ewm(span=5, min_periods=1).mean())

Another problem is the output schema, which depending on your data, won't really accommodate the result:

  • If want to add ewm schema should be extended.
  • If you want to return only ewm then schema is to large.
  • If you want to just replace, it might not match the type.

Let's assume this is the first scenario (I allowed myself to rewrite your code a bit):

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import DoubleType, StructField

def exp_ma(df, group_col='Name', sort_col='Date'):
    schema = (df.select(group_col, sort_col, 'count')
        .schema.add(StructField('ewma', DoubleType())))

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ema(pdf):
        pdf['ewm'] = pdf['count'].ewm(span=5, min_periods=1).mean()
        return pdf

    return df.groupby('Name').apply(ema)

df = spark.createDataFrame(
    [("a", 1, 1), ("a", 2, 3), ("a", 3, 3), ("b", 1, 10), ("b", 8, 3), ("b", 9, 0)], 
    ("name", "date", "count")
)

exp_ma(df).show()
# +----+----+-----+------------------+                                            
# |Name|Date|count|              ewma|
# +----+----+-----+------------------+
# |   b|   1|   10|              10.0|
# |   b|   8|    3| 5.800000000000001|
# |   b|   9|    0|3.0526315789473686|
# |   a|   1|    1|               1.0|
# |   a|   2|    3|               2.2|
# |   a|   3|    3| 2.578947368421052|
# +----+----+-----+------------------+

I don't use much Pandas so there might be more elegant way of doing this.

Upvotes: 3

Related Questions