Callum Dempsey Leach
Callum Dempsey Leach

Reputation: 51

How do you choose the right spark.sql.shuffle.partitions size for Spark Structured Streaming?

I’m working with Spark Structured Streaming, specifically with Databricks Autoloader, to process millions of small records that land in our S3 buckets and index them into Delta Lake tables. There is about 3 billion files closing in on 500TB of information to churn through. I will like to share my anecdote. I noticed that the default shuffle size doesn’t seem to scale well—it leads to excessive context switching and performance bottlenecks when processing large volumes of individual files.

Here's my example, and please keep me honest on the details: I have 3 "r4.2xlarge" instances. This gives 8 cores on the executors weighing in at 16 threads. We are running FileScanRDD from the binaryFiles streamer. Each of these records are about 128KB big, and, for this small sample, we are consuming only a few million records at once in Spark Structured Streaming, the ingestion goal being 300 million.

The task we get stuck on

The number of tasks (Job ID 2 overwhelming the cluster

In my experience, adjusting the spark.sql.shuffle.partitions configuration from 200 (default) was a way I could improve performance. When I managed to lower it to 3x number of cores, I observed much better processing rates. I believe this issue is task saturation, and too much context switching for the cluster from the cluster's default.

The improvement after setting.

However trying to change this value from the default was difficult. There's a catch. spark.sql.shuffle.partitions setting is persisted in the stream’s checkpoint, which means that even after changing it, the stream still uses the old value for stateful aggregations unless I create a new checkpoint. Few sources confirmed this, first my measurement of numPartitions on the long running task in the Spark UI frustratingly wouldn't change from 200 when I tried to resize the Scan Binary File Stage, the Spark Docs, and a Databricks Community forum post.

So, given that shuffle size can't be changed once set, how can I determine the optimal spark.sql.shuffle.partitions for a streaming job?

Should I stick with the default of 200 partitions, especially when using more cores? I'm concerned that a lower shuffle size may limit scalability and under-utilize resources when scaling the cluster. Conversely, I am concerned the default may not fit my workload and would require a more expensive, larger cluster. What are the best practices for tuning this parameter to balance minimising context switching and maintaining scalability in production workloads?

Upvotes: 0

Views: 282

Answers (2)

Yash Kothari
Yash Kothari

Reputation: 36

There might be slowness due to memory management when reading and processing the data. How are we reading the files? Is there a trigger provided (e.g., a specific processing time or "once" trigger)?

Try using "availableNow" with Auto Loader, where we can process data across multiple micro-batches with rate limiting, giving more control over how many files or bytes to process in each batch.

Documentation link- https://docs.databricks.com/en/ingestion/cloud-object-storage/auto-loader/production.html#using-triggeravailablenow-and-rate-limiting

Based on the r4.2xlarge instance, we have 61 GiB of memory per instance. In our case, 3 x 61 = 183 GiB. Just make sure we don't approach this limit while processing, as it could lead to unnecessary garbage collection.

Shuffle partitions should not be a concern after AQE (Adaptive Query Execution), as it automatically coalesces empty partitions.

However, with fewer partitions, the partition size will increase, potentially leading to longer processing times due to reduced parallelism.

Upvotes: 0

Perl99
Perl99

Reputation: 153

No amount of guessing will provide the optimal answer, the only one way is to measure.

Small files increase overhead for scheduling, while being fast to process. You could either start next time with no more than 3x tasks than cores or use larger cluster. Usually larger cluster is not a problem because it will take less time, so it should cost you about the same amount of money in the end.

You can set environment variable SPARK_WORKER_CORES to 2x your cores, to make Spark run more than one task on the same core, utilizing more CPU while the other task waits for I/O. See section in docs: https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts

Upvotes: 0

Related Questions