MrCartoonology
MrCartoonology

Reputation: 2067

How do you see partition sizes created for Window function in pyspark?

My spark (pyspark) ETL using a window function has stopped working. I wonder if it is skewness in the data. The Window does something like

windowSpec = Window.partitionBy('user').orderBy('time').rowsBetween(1, 1)
next_time = F.lead('time', 1).over(windowSpec)

What if the data has some outlier users with lots of data? When spark partitions by user to do the window, I imagine I could get a partition which is too big - I am seeing only two of the many jobs fail (job may be the wrong terminology).

How do I check this? I know I can do a df.groupBy('user').count() and look for outlier users, but how do I see how big the partitions the Window function needs will be? I'm hoping spark will automatically put a few big users in one partition, and a lot of small users in others.

Upvotes: 1

Views: 1285

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35219

Window functions with PARTITION BY use the same hash partitioner as the standard aggregations. Let's confirm data with example data:

import string
import random
from pyspark.sql import functions as F

random.seed(1)
spark.conf.set("spark.sql.shuffle.partitions", 11)

df = spark.createDataFrame([
    random.sample(string.ascii_letters, 1) +  [random.random()] 
    for _ in range(10000)
], ("user", "time"))

With aggregation:

(df.groupBy("user")
    .count()
    .groupBy(spark_partition_id().alias("partition"))
    .agg(F.sum("count"))
    .orderBy("partition")
    .show())

# +---------+----------+
# |partition|sum(count)|
# +---------+----------+
# |        0|       954|
# |        1|       605|
# |        2|      1150|
# |        3|      1339|
# |        4|       922|
# |        5|       751|
# |        6|       562|
# |        7|       579|
# |        8|      1440|
# |        9|       582|
# |       10|      1116|
# +---------+----------+

With window functions:

(df
    .withColumn("next_time", next_time)
     # Ensure that window is not removed from the execution plan
    .rdd.toDF()  
    .groupBy(spark_partition_id().alias("partition"))
    .count()
    .orderBy("partition")
    .show())

# +---------+-----+
# |partition|count|
# +---------+-----+
# |        0|  954|
# |        1|  605|
# |        2| 1150|
# |        3| 1339|
# |        4|  922|
# |        5|  751|
# |        6|  562|
# |        7|  579|
# |        8| 1440|
# |        9|  582|
# |       10| 1116|
# +---------+-----+

Upvotes: 0

Related Questions