user2680514
user2680514

Reputation: 257

Spark write Parquet to S3 the last task takes forever

I'm writing a parquet file from DataFrame to S3. When I look at the Spark UI, I can see all tasks but 1 completed swiftly of the writing stage (e.g. 199/200). This last task appears to take forever to complete, and very often, it fails due to exceeding executor memory limit.

I'd like to know what is happening in this last task. How to optimize it? Thanks.

Upvotes: 14

Views: 13842

Answers (4)

bcosta12
bcosta12

Reputation: 2452

I have tried Glennie Helles Sindholt solution and works very well. Here is the code:

path = 's3://...'
n = 2 # number of repartitions, try 2 to test
spark_df = spark_df.repartition(n)
spark_df.write.mode("overwrite").parquet(path)

Upvotes: 11

Ryan
Ryan

Reputation: 299

As others have noted, data skew is likely at play.

Besides that, I noticed that your task count is 200.

The configuration parameter spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregations.

200 is the default for this setting, but generally it is far from an optimal value.

For small data, 200 could be overkill and you would waste time in the overhead of multiple partitions.

For large data, 200 can result in large partitions, which should be broken down into more, smaller partitions.

The really rough rules of thumb are: - have 2-3x number of partitions to cpu's. - Or ~128MB.

2GB's is the max partition size. If you are hovering just below 2000 partitions, Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000[1]

private[spark] object MapStatus {

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...

You can try playing with this parameter at runtime:

spark.conf.set("spark.sql.shuffle.partitions", "300")

[1]What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

Upvotes: 3

retnuH
retnuH

Reputation: 1594

This article - The Bleeding Edge: Spark, Parquet and S3 has a lot of useful information about Spark, S3 and Parquet. In particular, it talks about how the driver ends up writing out the _common_metadata_ files and can take quite a bit of time. There is a way to turn it off.

Unfortunately, they say that they go on to generate the common metadata themselves, but don't really talk about how they did so.

Upvotes: 2

Glennie Helles Sindholt
Glennie Helles Sindholt

Reputation: 13154

It sounds like you have a data skew. You can fix this by calling repartition on your DataFrame before writing to S3.

Upvotes: 7

Related Questions