Reputation: 8700
Having trouble doing an aggregation across many columns in Pyspark. There are hundreds of boolean columns showing the current state of a system, with a row added every second. The goal is to transform this data to show the number of state changes for every 10 second window.
I planned to do this in two steps, first XOR the boolean value with the previous row's value, then second sum over a 10 second window. Here's the rough code I came up with:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql import types as T, functions as F
from datetime import datetime, timedelta
from random import random
import time
sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)
# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
[(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(10000)],
['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])
# Generate changes
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time')
# data_window = Window.orderBy('Time')
df = df.select('Time', *[F.col(m).bitwiseXOR(F.lag(m, 1).over(data_window)).alias(m) for m in cols])
df = df.groupBy(F.window('Time', '10 seconds')) \
.agg(*[F.sum(m).alias(m) for m in cols]) \
.withColumn('start_time', F.col('window')['start']) \
.drop('window')
df.orderBy('start_time').show(20, False)
# Keep UI open
time.sleep(60*60)
With the data_window
partitioned by minute, Spark generates 52 stages, each dependent on the last. Increasing the num_of_cols
increases the number of stages as well. It seems to me this should be an embarrassingly parallelizable problem. Compare each row to the last, and then aggregate by 10 seconds. Removing the data_window
partitionBy allows it to run in a single stage, but it forces all the data on a single partition to achieve it.
Why are the stages dependent on eachother, is there a better way to write this to improve parallelization? I'd think it'd be possible to do multiple aggregations over the same window at the same time. Eventually this would need to scale to hundreds of columns, are there any tricks to improve performance at that point?
Upvotes: 1
Views: 926
Reputation: 8700
Based off the helpful response from Georg, I came up with the following:
import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql import types as T, functions as F
from datetime import datetime, timedelta
from random import random
import time
import pprint
sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)
@F.pandas_udf(T.ArrayType(T.IntegerType()), F.PandasUDFType.GROUPED_AGG)
def pandas_xor(v):
values = v.values
if len(values) == 1:
return values[0] * False
elif len(values) == 2:
return values[0] ^ values[1]
else:
raise RuntimeError('Too many values given to pandas_xor: {}'.format(values))
# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
[(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(100000)],
['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])
df = df.select('Time', F.array(*cols).alias('data'))
# XOR
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time').rowsBetween(Window.currentRow, 1)
# data_window = Window.orderBy('Time')
df = df.select('Time', pandas_xor(df.data).over(data_window).alias('data'))
df = df.groupBy(F.window('Time', '10 seconds')) \
.agg(*[F.sum(F.element_at('data', i + 1)).alias(m) for i, m in enumerate(cols)]) \
.withColumn('start_time', F.col('window')['start']) \
.drop('window')
df.orderBy('start_time').show(20, False)
# Keep UI open
time.sleep(60*60)
With the following instructions to run it with Spark 3.0.0preview2
Download Spark 3.0.0
mkdir contrib
wget -O contrib/spark-3.0.0-preview2.tgz 'https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz'
tar -C contrib -xf contrib/spark-3.0.0-preview2.tgz
rm contrib/spark-3.0.0-preview2.tgz
In first shell, configure environment to use Pyspark 3.0.0
export SPARK_HOME="$(pwd)/contrib/spark-3.0.0-preview2-bin-hadoop2.7"
export PYTHONPATH="$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip"
Kick off pyspark job
time python3 so-example.py
View local Spark run's Web UI at http://localhost:4040
Upvotes: 0