Reputation: 257
I'm writing a parquet file from DataFrame to S3. When I look at the Spark UI, I can see all tasks but 1 completed swiftly of the writing stage (e.g. 199/200). This last task appears to take forever to complete, and very often, it fails due to exceeding executor memory limit.
I'd like to know what is happening in this last task. How to optimize it? Thanks.
Upvotes: 14
Views: 13842
Reputation: 2452
I have tried Glennie Helles Sindholt solution and works very well. Here is the code:
path = 's3://...'
n = 2 # number of repartitions, try 2 to test
spark_df = spark_df.repartition(n)
spark_df.write.mode("overwrite").parquet(path)
Upvotes: 11
Reputation: 299
As others have noted, data skew is likely at play.
Besides that, I noticed that your task count is 200
.
The configuration parameter spark.sql.shuffle.partitions
configures the number of partitions that are used when shuffling data for joins or aggregations.
200
is the default for this setting, but generally it is far from an optimal value.
For small data, 200 could be overkill and you would waste time in the overhead of multiple partitions.
For large data, 200 can result in large partitions, which should be broken down into more, smaller partitions.
The really rough rules of thumb are: - have 2-3x number of partitions to cpu's. - Or ~128MB.
2GB's is the max partition size. If you are hovering just below 2000 partitions, Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000[1]
private[spark] object MapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
}
...
You can try playing with this parameter at runtime:
spark.conf.set("spark.sql.shuffle.partitions", "300")
Upvotes: 3
Reputation: 1594
This article - The Bleeding Edge: Spark, Parquet and S3 has a lot of useful information about Spark, S3 and Parquet. In particular, it talks about how the driver ends up writing out the _common_metadata_ files and can take quite a bit of time. There is a way to turn it off.
Unfortunately, they say that they go on to generate the common metadata themselves, but don't really talk about how they did so.
Upvotes: 2
Reputation: 13154
It sounds like you have a data skew. You can fix this by calling repartition
on your DataFrame
before writing to S3.
Upvotes: 7