WestCoastProjects
WestCoastProjects

Reputation: 63022

Spark dataframe saveAsTable is using a single task

We have a pipeline for which the initial stages are properly scalable - using several dozen workers apiece.

One of the last stages is

dataFrame.write.format(outFormat).mode(saveMode).
partitionBy(partColVals.map(_._1): _*).saveAsTable(tname)

For this stage we end up with a single worker. This clearly does not work for us - in fact the worker runs out of disk space - on top of being very slow.

enter image description here

Why would that command end up running on a single worker/single task only?

Update The output format was parquet. The number of partition columns did not affect the result (tried one column as well as several columns).

Another update None of the following conditions (as posited by an answer below) held:

Upvotes: 4

Views: 3185

Answers (1)

zero323
zero323

Reputation: 330063

The problem is unlikely to be related in any way to saveAsTable.

A single task in a stage indicates that the input data (Dataset or RDD) has only a one partition. This is contrast to cases where there are multiple tasks but one or more have significantly higher execution time, which normally correspond to partitions containing positively skewed keys. Also you should confound a single task scenario with low CPU utilization. The former is usually a result of insufficient IO throughput (high CPU wait times are the most obvious indication of that), but in rare cases can be traced to usage of shared objects with low level synchronization primitives.

Since standard data sources don't shuffle data on write (including cases where partitionBy and bucketBy options are used) it is safe to assume that data has been repartitioned somewhere in the upstream code. Usually it means that one of the following happened:

  • Data has been explicitly moved to a single partition using coalesce(1) or repartition(1).
  • Data has been implicitly moved to a single partition for example with:

    • Dataset.limit
    • Window function applications with window definition lacking PARTITION BY clause.

      df.withColumn(
        "row_number", 
        row_number().over(Window.orderBy("some_column"))
      )
      
    • sql.shuffle.partitions option is set to 1 and upstream code includes non-local operation on a Dataset.

    • Dataset is a result of applying a global aggregate function (without GROUP BY caluse). This usually not an issue, unless function is non-reducing (collect_list or comparable).

While there is no evidence that it is the problem here, in general case you should also possibility, data contains only a single partition all the way to the source. This usually when input is fetched using JDBC source, but the 3rd party formats can exhibit the same behavior.

To identify the source of the problem you should either check the execution plan for the input Dataset (explain(true)) or check SQL tab of the Spark Web UI.

Upvotes: 3

Related Questions