datagic
datagic

Reputation: 25

Spark - In this case, when does repartition occur?

I need to output a unique file in each prefix, so the code is written like this ds.repartition(1).write.partitionBy("prefix").mode(SaveMode.Overwrite).csv(output) Before the code did not add repartition, each prefix will have thousands of files, and the task can be completed in 2 hours. After adding repartition, each prefix will have 1 file, and the task will be executed for more than 7 hours. At what stage is repartition executed? Am I using this gracefully?

Upvotes: 1

Views: 760

Answers (2)

Nikunj Kakadiya
Nikunj Kakadiya

Reputation: 2988

Whenever you do repartition it does a full shuffle and distribute the data evenly as much as possible. In your case when you do ds.repartition(1), it shuffles all the data and bring all the data in a single partition on one of the worker node.

Now when you perform the write operation then only one worker node/executor is performing the write operation after partitioning by prefix. As only single worker is doing the work it is taking lot of time.

Few Stuffs that you could take into consideration:

  1. If there is no real reason to have only one csv file , try to avoid doing that.
  2. Instead of repartition(1) , use coalesce(1) that will do minimum shuffle instead of repartition(1) that would do full shuffle.
  3. saving a single csv file , you are not utilizing the spark's power of parallelism.

Upvotes: 2

Gaurav
Gaurav

Reputation: 304

if you want to use prefix as partition column, then you need run

spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

and you can use coalesce(1) instead of repartition(1), beacause in this case, coalesce don't shuffle, repartition has shuffle, and the partition is one, then just has one task to deal the all data. so it cost 7 hours.

Upvotes: 0

Related Questions