Reputation: 1829
I'm running a broadcast joins in a spark standalone cluster, with 56 executors (7 cores and 44GiB memory each).
res = df_chunk.join(
broadcast(df_subset),
on=(
(df_chunk["__file_id__"] == df_subset["file_id"])
& (df_chunk["__row_id__"] == df_subset["row_id"])
),
how="inner",
)
However, join
operation seems to be divided into 3920 tasks, out of them 3910 tasks complete within a second, and the remaining 10 tasks take around 10 minutes to complete.
[Stage 11:===================================================>(3911 + 9) / 3920]
This can also be confirmed in Spark UI:
Also it seems like out of 56 executors, only around 8 executors running the join.
Since this is a broadcast join, data skewness couldn't be the issue I think.
What could be the reasons for this under-utilization of executors?
Update
I'm using the following code to divide my large dataset into sub-sets, so that I can run that in my infrastructure. It appears that the number of actual tasks that do the work is always equals to the chunk_size
and one file in the chunk
is around 3GB .parquet
file.
def merge_dataframes(data_frames: List['DataFrame']) -> 'DataFrame':
"""
merge List of dataframes vertically
"""
if len(data_frames) < 2:
return data_frames[0]
_df = data_frames.pop(0)
while data_frames:
_df = _df.union(data_frames.pop(0))
return _df
to_chunk = []
for file in chunk:
file_id = FIS.get_file_index(file)
df_file = spark.read.parquet(file)
df_file = df_file.withColumn('__file_id__', lit(file_id))
to_chunk.append(df_file)
file_ids.append(file_id)
df_chunk = merge_dataframes(to_chunk).repartition(NUM_CORES)
df_subset = df.filter(df.file_id.isin(file_ids)).repartition(NUM_CORES)
Upvotes: 3
Views: 196