Eric C
Eric C

Reputation: 165

Spark Repartition Executors

I have a data source, around 100GB, and I'm trying to write it partitioned using a date column.

In order to avoid small chunks inside the partitions, I've added a repartition(5) to have 5 files max inside each partition :

df.repartition(5).write.orc("path")

My problem here, is that only 5 executores out of the 30 I'm allocating are actually running. In the end I have what I want (5 files inside each partition), but since only 5 executors are running, the execution time is extremely high.

Dy you have any suggestion on how I can make it faster ?

Upvotes: 2

Views: 2081

Answers (3)

Eric C
Eric C

Reputation: 165

I fixed it using simply :

df.repartition($"dateColumn").write.partitionBy("dateColumn").orc(path)

And allocating the same number of executors as the number of partitions I ll have in the output.

Thanks all

Upvotes: 1

Vahid Shahrivari
Vahid Shahrivari

Reputation: 138

Spark can run 1 concurrent task for every partition of an RDD or data frame (up to the number of cores in the cluster). If your cluster has 30 cores, you should have at least 30 partitions. On the other hand, a single partition typically shouldn’t contain more than 128MB and a single shuffle block cannot be larger than 2GB (see SPARK-6235). Since you want to reduce your execution time, it is better to increase your number of partitions and at the end of your job reduce your number of partitions for your specific job. for better distribution of your data (equally) among partition, it is better to use the hash partitioner.

Upvotes: 0

Shubham Jain
Shubham Jain

Reputation: 5536

You can use repartition along with partitionBy to resolve the issue. There are two ways to solve this.

Suppose you need to partition by dateColumn

df.repartition(5, 'dateColumn').write.partitionBy('dateColumn').parquet(path)

In this case the number of executors used will be equal to 5 * distinct(dateColumn) and all your date will contain 5 files each.

Another approach is to repartition your data 3 times no of executors then using maxRecordsPerFile to save data this will create equal sizze files but you will lose control over the number of files created

df.repartition(60).write.option('maxRecordsPerFile',200000).partitionBy('dateColumn').parquet(path)

Upvotes: 0

Related Questions