Krzysztof Chris Mejka
Krzysztof Chris Mejka

Reputation: 534

Spark: set maximum partition size when joining

When doing a join in spark, or generally for shuffle operations, I can set the maximum number of partitions, in which I want spark to execute this operation.

As per documentation:

spark.sql.shuffle.partitions 200 Configures the number of partitions to use when shuffling data for joins or aggregations.

If I want to lower the amount of work that has to be done in each task, I would have to estimate the total size of data and adjust this parameter accordingly (more partitions means less work done in a single task, but more tasks).

I am wondering, can I tell spark to simply adjust the amount of partitions based on the amount of data? I.e. set the maximum partition size during join operations?

Additional question - how does spark know what is the total size of the datasets to be processed, when doing a repartition into 200 roughly equal partitions?

Thanks in advance!

Upvotes: 2

Views: 6041

Answers (1)

Lior Chaga
Lior Chaga

Reputation: 1480

AFAIK, there's no such option to target the shuffle partitions for specific output size. So this tuning is left for you... This to some extent could be addressed on downstream read path at certain scenario. Let's say you join data and write output to parquet over hdfs. You could have your query results repartitioned to 1 (or very low number of partitions). Consider it as a funnel - having a join with some aggregation performed with 200 partitions, and then further reduce the parallelism over the aggregated data (this should involve relatively small IO). Suppose you aim at 256 MB block size. Options are the output is somewhere around it, way below it or way above it. For the first two cases, you basically achieved what you aimed for, and that's avoid too fragmented data (and in case of hdfs, too many blocks in namenode). But if your output is way above target block size, which would obviously affect execution time of downstream jobs, you could use spark.sql.files.maxPartitionBytes to control the number of partition this data is read into. So even if you have 2GB output, setting this parameter to 128MB would yield 16 partitions on read path.

Regarding your 2nd question, spark just use hash partitioner, and computes the hash over the join columns. You can, of course, affect the partitioning by using distribute by.

Upvotes: 4

Related Questions