Luis Leal
Luis Leal

Reputation: 3514

How spark distributes partitions to executors

I have a performance issue and after analyzing Spark web UI i found what it seems to be data skewness: enter image description here

Initially i thought partitions were not evenly distributed, so i performed an analysis of rowcount per partitions, but it seems normal(with no outliers): how to manually run pyspark's partitioning function for debugging

But the problem persists and i see there is one executor processing most of the data:

enter image description here

So the hypothesis now is partitions are not evenly distributed across executors, question is: how spark distributes partitions to executors? and how can i change it to solve my skewness problem?

The code is very simple:

hive_query = """SELECT ... FROM <multiple joined hive tables>"""
df = sqlContext.sql(hive_query).cache()
print(df.count())

Update after posting this question i performed further analysis and found that there 3 tables that cause this, if they are removed the data is evenly distributed in the executors and performance improves, so i added the spark sql hint /*+ BROADCASTJOIN(<table_name>) */ and it worked, performance is much better now, but the question remains:

why do this tables(including a small 6 rows table) cause this uneven distribution across executors when added to the query ?

Upvotes: 5

Views: 3205

Answers (2)

Partha Deb
Partha Deb

Reputation: 183

When you are reading data from HDFS the number of partitions depends on the number of blocks you are reading. From the images attached it looks like your data is not distributed evenly across the cluster. Try repartitioning your data and increase tweak the number of cores and executors.

If you are repartitioning your data, the hash partitioner is returning a value which is more common than other can lead to data skew.

If this is after performing join, then your data is skewed.

Upvotes: 2

Amiya Mishra
Amiya Mishra

Reputation: 27

repartition() will not give you to evenly distributed the dataset as Spark internally uses HashPartitioner. To put your data evenly in all partitions then in my point of view Custom Partitioner is the way.

In this case you need to extend the org.apache.spark.Partitioner class and use your own logic instead of HashPartition. To achieve this we need to convert the RDD to PairRDD.

Found below blog post which will be help you in your case: https://blog.clairvoyantsoft.com/custom-partitioning-spark-datasets-25cbd4e2d818

Thanks

Upvotes: 3

Related Questions