TMichel
TMichel

Reputation: 4442

PySpark - optimize number of partitions after parquet read

In a parquet data lake partitioned by year and month, with spark.default.parallelism set to i.e. 4, lets say I want to create a DataFrame comprised of months 11~12 from 2017, and months 1~3 from 2018 of two sources A and B.

df = spark.read.parquet(
    "A.parquet/_YEAR={2017}/_MONTH={11,12}",
    "A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
    "B.parquet/_YEAR={2017}/_MONTH={11,12}",
    "B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)

If I get the number of partitions, Spark used spark.default.parallelism as default:

df.rdd.getNumPartitions()
Out[4]: 4

Taking into account that after creating df I need to perform join and groupBy operations over each period, and that data is more or less evenly distributed over each one (around 10 million rows per period):

Question

Upvotes: 2

Views: 1298

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

Will a repartition improve the performance of my subsequent operations?

Typically it won't. The only reason to preemptively repartition data is to avoid further shuffling when the same Dataset is used for multiple joins, based on the same condition

If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?

Let's go step-by-step:

  • should I repartition by the number of periods

    Practitioners don't guarantee 1:1 relationship between levels and partitions, so the only thing to remember is, that you cannot have more non-empty partitions than unique keys, so using significantly larger value doesn't make sense.

  • and explicitly reference the columns to repartition

    If you repartition and subsequently join or groupBy using the same set of columns for both parts is the only sensible solution.

Summary

repartitoning before join makes sense in two scenarios:

  • In case of multiple subsequent joins

    df_ = df.repartition(10, "foo", "bar")
    df_.join(df1, ["foo", "bar"])
    ...
    df_.join(df2, ["foo", "bar"])
    
  • With single join when desired number of the output partitions is different than spark.sql.shuffle.partitions (and there is no broadcast join)

    spark.conf.get("spark.sql.shuffle.partitions")
    # 200
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df1_ = df1.repartition(11, "foo", "bar")
    df2_ = df2.repartition(11, "foo", "bar")
    
    df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions()
    # 11
    
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    # 200
    

    which might be preferable over:

    spark.conf.set("spark.sql.shuffle.partitions", 11)
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    spark.conf.set("spark.sql.shuffle.partitions", 200)
    

Upvotes: 3

Related Questions