Reputation: 25
I need to output a unique file in each prefix, so the code is written like this ds.repartition(1).write.partitionBy("prefix").mode(SaveMode.Overwrite).csv(output)
Before the code did not add repartition, each prefix will have thousands of files, and the task can be completed in 2 hours. After adding repartition, each prefix will have 1 file, and the task will be executed for more than 7 hours. At what stage is repartition executed? Am I using this gracefully?
Upvotes: 1
Views: 760
Reputation: 2988
Whenever you do repartition it does a full shuffle and distribute the data evenly as much as possible. In your case when you do ds.repartition(1), it shuffles all the data and bring all the data in a single partition on one of the worker node.
Now when you perform the write operation then only one worker node/executor is performing the write operation after partitioning by prefix. As only single worker is doing the work it is taking lot of time.
Few Stuffs that you could take into consideration:
Upvotes: 2
Reputation: 304
if you want to use prefix as partition column, then you need run
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
and you can use coalesce(1) instead of repartition(1), beacause in this case, coalesce don't shuffle, repartition has shuffle, and the partition is one, then just has one task to deal the all data. so it cost 7 hours.
Upvotes: 0