Reputation: 73
I want to partition a dataframe "df1" on 3 columns. This dataframe has exactly 990 unique combinaisons for those 3 columns:
In [17]: df1.createOrReplaceTempView("df1_view")
In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+
|count(1)|
+--------+
| 990|
+--------+
In order to optimize the processing of this dataframe, I want to partition df1 in order to get 990 partitions, one for each key possibility:
In [19]: df1.rdd.getNumPartitions()
Out[19]: 24
In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")
In [21]: df2.rdd.getNumPartitions()
Out[21]: 990
I wrote a simple way to count rows in each partition:
In [22]: def f(iterator):
...: a = 0
...: for partition in iterator:
...: a = a + 1
...: print(a)
...:
In [23]: df2.foreachPartition(f)
And I notice that what I get in fact is 628 partitions with one or more key values, and 362 empty partitions.
I assumed spark would repartition in an even way (1 key value = 1 partition) but that does not seem like it, and I feel like this repartitionning is adding data skew even though it should be the other way around...
What's the algorithm Spark uses to partition a dataframe on columns ? Is there a way to achieve what I thought was possible ?
I'm using Spark 2.2.0 on Cloudera.
Upvotes: 7
Views: 5851
Reputation: 2944
To distribute data across partitions spark needs somehow to convert value of the column to index of the partition. There are two default partitioners in Spark - HashPartitioner and RangePartitioner. Different transformations in Spark can apply different partitioners - e.g. join
will apply hash partitioner.
Basically for hash partitioner formula to convert value to partition index would be value.hashCode() % numOfPartitions
. In your case multiple values are mapping to same partition index.
You could implement your own partitioner if you want better distribution. More about it is here and here and here.
Upvotes: 9