Reputation: 8678
spark.driver.memory=4g
spark.executor.memory=39g
spark.yarn.executor.memoryoverhead=4
spark.dynamicAllocation.maxExecutors=10
Yarn Queue has 750gb and 150 vcores.
Overall implementation looks like
JavaRDD<Collection<InternalObject>> finalRDD = parentJavaRDD
.filter
.mapToPair(<implementation of PairFunction>)
.reduceByKey(<implemementation of Function2>)
.values();
finalRDD
.map(<Impmenetation of Function)
.saveAsTextFile(outputHDFSPath);
When i see SPARK executor logs on YARN history server, i see that 1 out of 9 executors took appx. 2 hour compared to everyone else who completed within few minutes.
What else can i optimize here ? Given only 1 executor is taking ~2hours, should i use repartition given that quite low shuffle
.map(<Impmenetation of Function)
.repartition(finalRDD.partitions().size())
.saveAsTextFile(outputHDFSPath)
Upvotes: 2
Views: 290
Reputation: 6338
I would suggest to repartition your data using approximately normally distributed key in such a way that each task finishes within few minutes.
Strategy to pick repartition column completely depend on the kind of data you are working on.
If you know the data statistics/behaviour well in advance then I will suggest you to pick a column having less skewed data and perform few experiments using partition number(default 200=spark.sql.shuffle.partitions). Here your target should be pick the partition number in such a way that each task should be completed within few mins. if you don't wanted to perform any experiment you can go with default partition number.
For this, you need to compute the skewness of all fields and pick the one which has lowest skew. There are multiple statistical techniques to find out normal distribution like - https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test etc. Even spark api are available for this -
// run a KS test for the sample versus a standard normal distribution
val testResult = Statistics.kolmogorovSmirnovTest(data, "norm", 0, 1)
println(testResult)
ref- https://spark.apache.org/docs/1.6.3/mllib-statistics.html#hypothesis-testing
If you wanted to go with some dummy logic, then just find the skew for all columns and pick the one with low duplicate. - sample logic for a column
select a.f1, count(*) cnt from A a
group by a.f1
order by cnt desc
limit 1;
check- Why is the final reduce step extremely slow in this MapReduce? (HiveQL, HDFS MapReduce)
PS. this will add up the execution time, so would suggest to go with option #1 if possible
Upvotes: 2
Reputation: 2838
Instead of reduceByKey try to use aggregateByKey:
- Using aggregateByKey splits the calculation in two steps
- Aggregate by partition first, only one pair per key, per partition is shuffled
- greatly reducing I/O and memory usage
- Combine partition aggregations
- Partitioning can have a huge impact on performance in Spark
- There are several factors to be consider:
- You want your data:
- Evenly distributed data
- Preferred locations
- On systems like Hadoop and Cassandra, partitions should align with cores
- Number of CPU cores
- How long it takes to run a task
- Avoid shuffling
- Movement of data between nodes
- Very expensive
- OOM errors
- It all depends on your data and the type of operations you will perform
- Default partitioning is defined by input format
- E.g, on Hadoop splits by HDFS cores
- Filter or map don't change partitioning
- Repartition: increase partitions
- Rebalancing partitions after filter
- Increase paralellism
repartition(numPartitions: Int)
- Coalesce: decrease partitions WITHOUT shuffle
- Consolidate before outputting to HDFS/external
coalesce(numPartitions: Int, suffle: Boolean = false)
Upvotes: 0