Reputation: 165
I have a data source, around 100GB, and I'm trying to write it partitioned using a date column.
In order to avoid small chunks inside the partitions, I've added a repartition(5) to have 5 files max inside each partition :
df.repartition(5).write.orc("path")
My problem here, is that only 5 executores out of the 30 I'm allocating are actually running. In the end I have what I want (5 files inside each partition), but since only 5 executors are running, the execution time is extremely high.
Dy you have any suggestion on how I can make it faster ?
Upvotes: 2
Views: 2081
Reputation: 165
I fixed it using simply :
df.repartition($"dateColumn").write.partitionBy("dateColumn").orc(path)
And allocating the same number of executors as the number of partitions I ll have in the output.
Thanks all
Upvotes: 1
Reputation: 138
Spark can run 1 concurrent task for every partition of an RDD or data frame (up to the number of cores in the cluster). If your cluster has 30 cores, you should have at least 30 partitions. On the other hand, a single partition typically shouldn’t contain more than 128MB and a single shuffle block cannot be larger than 2GB (see SPARK-6235). Since you want to reduce your execution time, it is better to increase your number of partitions and at the end of your job reduce your number of partitions for your specific job. for better distribution of your data (equally) among partition, it is better to use the hash partitioner.
Upvotes: 0
Reputation: 5536
You can use repartition along with partitionBy to resolve the issue. There are two ways to solve this.
Suppose you need to partition by dateColumn
df.repartition(5, 'dateColumn').write.partitionBy('dateColumn').parquet(path)
In this case the number of executors used will be equal to 5 * distinct(dateColumn)
and all your date will contain 5 files each.
Another approach is to repartition your data 3 times no of executors then using maxRecordsPerFile
to save data this will create equal sizze files but you will lose control over the number of files created
df.repartition(60).write.option('maxRecordsPerFile',200000).partitionBy('dateColumn').parquet(path)
Upvotes: 0