Reputation: 63022
We have a pipeline for which the initial stages are properly scalable - using several dozen workers apiece.
One of the last stages is
dataFrame.write.format(outFormat).mode(saveMode).
partitionBy(partColVals.map(_._1): _*).saveAsTable(tname)
For this stage we end up with a single worker. This clearly does not work for us - in fact the worker runs out of disk space - on top of being very slow.
Why would that command end up running on a single worker/single task only?
Update The output format was parquet
. The number of partition columns did not affect the result (tried one column as well as several columns).
Another update None of the following conditions (as posited by an answer below) held:
coalesce
or partitionBy
statementswindow
/ analytic functionsDataset.limit
sql.shuffle.partitions
Upvotes: 4
Views: 3185
Reputation: 330063
The problem is unlikely to be related in any way to saveAsTable
.
A single task in a stage indicates that the input data (Dataset
or RDD
) has only a one partition. This is contrast to cases where there are multiple tasks but one or more have significantly higher execution time, which normally correspond to partitions containing positively skewed keys. Also you should confound a single task scenario with low CPU utilization. The former is usually a result of insufficient IO throughput (high CPU wait times are the most obvious indication of that), but in rare cases can be traced to usage of shared objects with low level synchronization primitives.
Since standard data sources don't shuffle data on write (including cases where partitionBy
and bucketBy
options are used) it is safe to assume that data has been repartitioned somewhere in the upstream code. Usually it means that one of the following happened:
coalesce(1)
or repartition(1)
.Data has been implicitly moved to a single partition for example with:
Dataset.limit
Window function applications with window definition lacking PARTITION BY
clause.
df.withColumn(
"row_number",
row_number().over(Window.orderBy("some_column"))
)
sql.shuffle.partitions
option is set to 1 and upstream code includes non-local operation on a Dataset
.
Dataset
is a result of applying a global aggregate function (without GROUP BY
caluse). This usually not an issue, unless function is non-reducing (collect_list
or comparable).While there is no evidence that it is the problem here, in general case you should also possibility, data contains only a single partition all the way to the source. This usually when input is fetched using JDBC source, but the 3rd party formats can exhibit the same behavior.
To identify the source of the problem you should either check the execution plan for the input Dataset
(explain(true)
) or check SQL tab of the Spark Web UI.
Upvotes: 3