Infinity
Infinity

Reputation: 33

Spark AQE dynamic coalescing post filter no shuffle

I have the following function:

def calculate_delta(old_df: DataFrame,
                    new_df: DataFrame,
                    id_keys: List[str],
                    deletion_field_name: str,
                    excluded_fields: List[str] = None) -> DataFrame:
    # make sure both the new_df and old_df have the same columns order
    if excluded_fields:
        old_df = old_df.drop(*excluded_fields)
    shared_columns = set(old_df.columns).intersection(new_df.columns)
    new_df = new_df.select(*id_keys, f.struct(*shared_columns).alias("new_struct"))
    old_df = old_df.select(*id_keys, f.struct(*shared_columns).alias("old_struct"))

    return new_df.join(old_df, on=id_keys, how="outer") \
        .filter("old_struct IS NULL OR new_struct IS NULL OR new_struct != old_struct") \
        .withColumn(deletion_field_name, f.expr("new_struct is null"))\
        .withColumn("output_struct", f.when(f.col("new_struct").isNull(), f.col("old_struct")).otherwise(f.col("new_struct")))\
        .selectExpr(f"(new_struct IS NULL) as {deletion_field_name}",
                    "output_struct.*")

The function takes 2 versions of a dataframe, and keeps only modified / deleted rows. Post join there are around 150M rows, and post filter it really depends, there can be 50 rows and there can be 50M rows, depending on what has changed that day.

I set "spark.sql.shuffle.partitions": "720".

The thing is that after using that function I'm writing the data to files, and it always tries to write 720 files (writes less sometimes depending on whether there were empty partitions), which is alot, and I'd like to use AQE dynamic coalescing like feature to reduce the amount of partitions thus the amount of written files dynamically. I can't really coalesce / repartition to a static amount cause it's dynamic and sometimes there are gonna be 50 modified / deleted rows and sometimes there can be 50M rows. I also tried repartitioning by id keys before writing to force AQE to perform dynamic coalesce, and it seems to be working, but I really want to avoid that solution as it's a redundant shuffle

So basically what I want is to reduce the amount of output files dynamically if possible Any suggestions?

I'm using spark 3.3

Thanks

Upvotes: 2

Views: 287

Answers (1)

mamonu
mamonu

Reputation: 703

Two settings that would be needed in your case are spark.sql.adaptive.coalescePartitions.enabled to True and to define spark.sql.adaptive.coalescePartitions.minPartitionNum: This is the configuration which allows you to set the minimum number of coalesced partitions. It could be set to a value that optimises your use case.

I think in your case it would be useful to calculate the optimal min_partitions dynamically before writing to disk. A proposed solution:

from pyspark.sql import DataFrame
from pyspark.rdd import RDD

def get_optimal_min_partitions(df: DataFrame, target_size_per_partition: int = 128 * 1024 * 1024) -> int:
    """
    Get the optimal minimum number of partitions based on dataframe size.
    :param df: The dataframe
    :param target_size_per_partition: The target size per partition in bytes. Default is 128MB.
    :return: The optimal minimum number of partitions.
    """
    # Get the underlying RDD
    rdd = df.rdd

    # Get the size of the data
    total_size = rdd._jrdd.partitions().size()

    # Calculate the optimal minimum number of partitions
    optimal_min_partitions = int(total_size / target_size_per_partition)
    optimal_min_partitions = max(1, optimal_min_partitions)

    return optimal_min_partitions

After you calculate the optimal min partition based on size of the delta df you can write to disk after setting the min partitions to the calculated value.

def calculate_and_write_delta(old_df: DataFrame,
                              new_df: DataFrame,
                              id_keys: List[str],
                              deletion_field_name: str,
                              output_path: str,
                              excluded_fields: List[str] = None) -> None:
    # calculate the delta
    delta_df = calculate_delta(old_df, new_df, id_keys, deletion_field_name, excluded_fields)
    
    # calculate optimal minimum partitions
    optimal_min_partitions = get_optimal_min_partitions(delta_df)
    spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", str(optimal_min_partitions))
    
    # write the data
    delta_df.write.parquet(output_path)

Upvotes: 0

Related Questions