Reputation: 2067
My spark (pyspark) ETL using a window function has stopped working. I wonder if it is skewness in the data. The Window does something like
windowSpec = Window.partitionBy('user').orderBy('time').rowsBetween(1, 1)
next_time = F.lead('time', 1).over(windowSpec)
What if the data has some outlier users with lots of data? When spark partitions by user to do the window, I imagine I could get a partition which is too big - I am seeing only two of the many jobs fail (job may be the wrong terminology).
How do I check this? I know I can do a df.groupBy('user').count()
and look for outlier users, but how do I see how big the partitions the Window function needs will be? I'm hoping spark will automatically put a few big users in one partition, and a lot of small users in others.
Upvotes: 1
Views: 1285
Reputation: 35219
Window functions with PARTITION BY
use the same hash partitioner as the standard aggregations. Let's confirm data with example data:
import string
import random
from pyspark.sql import functions as F
random.seed(1)
spark.conf.set("spark.sql.shuffle.partitions", 11)
df = spark.createDataFrame([
random.sample(string.ascii_letters, 1) + [random.random()]
for _ in range(10000)
], ("user", "time"))
With aggregation:
(df.groupBy("user")
.count()
.groupBy(spark_partition_id().alias("partition"))
.agg(F.sum("count"))
.orderBy("partition")
.show())
# +---------+----------+
# |partition|sum(count)|
# +---------+----------+
# | 0| 954|
# | 1| 605|
# | 2| 1150|
# | 3| 1339|
# | 4| 922|
# | 5| 751|
# | 6| 562|
# | 7| 579|
# | 8| 1440|
# | 9| 582|
# | 10| 1116|
# +---------+----------+
With window functions:
(df
.withColumn("next_time", next_time)
# Ensure that window is not removed from the execution plan
.rdd.toDF()
.groupBy(spark_partition_id().alias("partition"))
.count()
.orderBy("partition")
.show())
# +---------+-----+
# |partition|count|
# +---------+-----+
# | 0| 954|
# | 1| 605|
# | 2| 1150|
# | 3| 1339|
# | 4| 922|
# | 5| 751|
# | 6| 562|
# | 7| 579|
# | 8| 1440|
# | 9| 582|
# | 10| 1116|
# +---------+-----+
Upvotes: 0