Nipun Wijerathne
Nipun Wijerathne

Reputation: 1829

Very Slow Broadcast Join in Spark, Seems like due to Partitioning Skewness

I'm running a broadcast joins in a spark standalone cluster, with 56 executors (7 cores and 44GiB memory each).

res = df_chunk.join(
    broadcast(df_subset),
    on=(
        (df_chunk["__file_id__"] == df_subset["file_id"])
        & (df_chunk["__row_id__"] == df_subset["row_id"])
    ),
    how="inner",
)

However, join operation seems to be divided into 3920 tasks, out of them 3910 tasks complete within a second, and the remaining 10 tasks take around 10 minutes to complete.

[Stage 11:===================================================>(3911 + 9) / 3920]

This can also be confirmed in Spark UI:

enter image description here

Also it seems like out of 56 executors, only around 8 executors running the join.

enter image description here

Since this is a broadcast join, data skewness couldn't be the issue I think.

What could be the reasons for this under-utilization of executors?

Update

I'm using the following code to divide my large dataset into sub-sets, so that I can run that in my infrastructure. It appears that the number of actual tasks that do the work is always equals to the chunk_size and one file in the chunk is around 3GB .parquet file.


def merge_dataframes(data_frames: List['DataFrame']) -> 'DataFrame':
    """
    merge List of dataframes vertically
    """
    if len(data_frames) < 2:
        return data_frames[0]

    _df = data_frames.pop(0)
    while data_frames:
        _df = _df.union(data_frames.pop(0))

    return _df

to_chunk = []
for file in chunk:
     file_id = FIS.get_file_index(file)

     df_file = spark.read.parquet(file)
     df_file = df_file.withColumn('__file_id__', lit(file_id))
     to_chunk.append(df_file)
     file_ids.append(file_id)

     df_chunk = merge_dataframes(to_chunk).repartition(NUM_CORES)
     df_subset = df.filter(df.file_id.isin(file_ids)).repartition(NUM_CORES)

Upvotes: 3

Views: 196

Answers (0)

Related Questions