Jess
Jess

Reputation: 8700

Window aggregation on many columns in Spark

Having trouble doing an aggregation across many columns in Pyspark. There are hundreds of boolean columns showing the current state of a system, with a row added every second. The goal is to transform this data to show the number of state changes for every 10 second window.

I planned to do this in two steps, first XOR the boolean value with the previous row's value, then second sum over a 10 second window. Here's the rough code I came up with:

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql import types as T, functions as F

from datetime import datetime, timedelta
from random import random
import time

sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)

# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
    [(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(10000)],
    ['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])

# Generate changes
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time')
# data_window = Window.orderBy('Time')
df = df.select('Time', *[F.col(m).bitwiseXOR(F.lag(m, 1).over(data_window)).alias(m) for m in cols])

df = df.groupBy(F.window('Time', '10 seconds')) \
    .agg(*[F.sum(m).alias(m) for m in cols]) \
    .withColumn('start_time', F.col('window')['start']) \
    .drop('window')

df.orderBy('start_time').show(20, False)

# Keep UI open
time.sleep(60*60)

With the data_window partitioned by minute, Spark generates 52 stages, each dependent on the last. Increasing the num_of_cols increases the number of stages as well. It seems to me this should be an embarrassingly parallelizable problem. Compare each row to the last, and then aggregate by 10 seconds. Removing the data_window partitionBy allows it to run in a single stage, but it forces all the data on a single partition to achieve it.

Why are the stages dependent on eachother, is there a better way to write this to improve parallelization? I'd think it'd be possible to do multiple aggregations over the same window at the same time. Eventually this would need to scale to hundreds of columns, are there any tricks to improve performance at that point?

Upvotes: 1

Views: 926

Answers (1)

Jess
Jess

Reputation: 8700

Based off the helpful response from Georg, I came up with the following:

import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql import types as T, functions as F

from datetime import datetime, timedelta
from random import random
import time
import pprint


sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)


@F.pandas_udf(T.ArrayType(T.IntegerType()), F.PandasUDFType.GROUPED_AGG)
def pandas_xor(v):
    values = v.values
    if len(values) == 1:
        return values[0] * False
    elif len(values) == 2:
        return values[0] ^ values[1]
    else:
        raise RuntimeError('Too many values given to pandas_xor: {}'.format(values))


# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
    [(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(100000)],
    ['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])

df = df.select('Time', F.array(*cols).alias('data'))

# XOR
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time').rowsBetween(Window.currentRow, 1)
# data_window = Window.orderBy('Time')
df = df.select('Time', pandas_xor(df.data).over(data_window).alias('data'))

df = df.groupBy(F.window('Time', '10 seconds')) \
    .agg(*[F.sum(F.element_at('data', i + 1)).alias(m) for i, m in enumerate(cols)]) \
    .withColumn('start_time', F.col('window')['start']) \
    .drop('window')

df.orderBy('start_time').show(20, False)

# Keep UI open
time.sleep(60*60)

With the following instructions to run it with Spark 3.0.0preview2

  1. Download Spark 3.0.0

    mkdir contrib
    wget -O contrib/spark-3.0.0-preview2.tgz 'https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz'
    tar -C contrib -xf contrib/spark-3.0.0-preview2.tgz
    rm contrib/spark-3.0.0-preview2.tgz
    
  2. In first shell, configure environment to use Pyspark 3.0.0

    export SPARK_HOME="$(pwd)/contrib/spark-3.0.0-preview2-bin-hadoop2.7"
    export PYTHONPATH="$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip"
    
  3. Kick off pyspark job

    time python3 so-example.py
    

    View local Spark run's Web UI at http://localhost:4040

Upvotes: 0

Related Questions