Reputation: 3514
I have a performance issue and after analyzing Spark web UI i found what it seems to be data skewness:
Initially i thought partitions were not evenly distributed, so i performed an analysis of rowcount per partitions, but it seems normal(with no outliers): how to manually run pyspark's partitioning function for debugging
But the problem persists and i see there is one executor processing most of the data:
So the hypothesis now is partitions are not evenly distributed across executors, question is: how spark distributes partitions to executors? and how can i change it to solve my skewness problem?
The code is very simple:
hive_query = """SELECT ... FROM <multiple joined hive tables>"""
df = sqlContext.sql(hive_query).cache()
print(df.count())
Update after posting this question i performed further analysis and found that there 3 tables that cause this, if they are removed the data is evenly distributed in the executors and performance improves, so i added the spark sql hint /*+ BROADCASTJOIN(<table_name>) */ and it worked, performance is much better now, but the question remains:
why do this tables(including a small 6 rows table) cause this uneven distribution across executors when added to the query ?
Upvotes: 5
Views: 3205
Reputation: 183
When you are reading data from HDFS the number of partitions depends on the number of blocks you are reading. From the images attached it looks like your data is not distributed evenly across the cluster. Try repartitioning your data and increase tweak the number of cores and executors.
If you are repartitioning your data, the hash partitioner is returning a value which is more common than other can lead to data skew.
If this is after performing join, then your data is skewed.
Upvotes: 2
Reputation: 27
repartition()
will not give you to evenly distributed the dataset as Spark internally uses HashPartitioner
. To put your data evenly in all partitions then in my point of view Custom Partitioner is the way.
In this case you need to extend the org.apache.spark.Partitioner
class and use your own logic instead of HashPartition
. To achieve this we need to convert the RDD
to PairRDD
.
Found below blog post which will be help you in your case: https://blog.clairvoyantsoft.com/custom-partitioning-spark-datasets-25cbd4e2d818
Thanks
Upvotes: 3