Reputation: 131
I want to repartition my spark dataframe based on a column X. Say X column has 3 distinct values(X1,X2,X3). The number of distinct values could be varying.
I want one partition to contain records with only 1 value of X . ie. I want 3 partitions with 1 having records where X=X1 , other with X=X2 and last with X=X3.
I have unique walues of X from dataframe by query
val uniqueList = DF.select("X").distinct().map(x => x(0).toString).collect()
which is giving list of unique values correctly.
And to repartition I am doing
DF = DF.repartition(uniqueList.length, col('X'))
However, my partitions in DF are not coming as expected. Data is not distributed correctly as one partition is empty, second contains records with X1 and third partition has records with both X2 and X3.
Can someone please help if I am missing something.
EDIT:
My column X could have varying number of unique values. It could have 3 or 3000 unique values. If I do below
DF = DF.repartition(col('X'))
I will only get 200 partitions, as that is the default value of spark.sql.shuffle.partitions. Thus I am giving number of partition
If there are 3000 unique values of X then I want to repartition my DF in such a way that there are 3000 partitions and each partition contains records for one particular value of X. So that I can run mapPartition and process each partition parallel.
Upvotes: 2
Views: 2176
Reputation: 19308
Does this work?
val repartitionedDF = DF.repartition(col("X"))
Here's an example I blogged about
Data:
first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China
Code:
df
.repartition(col("country"))
.write
.partitionBy("country")
.parquet(outputPath)
Filesystem output:
partitioned_lake1/
country=Argentina/
part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
country=China/
part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
country=Russia/
part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
Upvotes: 0
Reputation: 20551
Repartitioning is based on hash partitioning (take the hash code of the partitioning key modulo the number of partitions), so whether each partition only has one value is purely chance.
If you can map each partitioning key to a unique Int
in the range of zero to (number of unique values - 1), since the hash code of an Int
in Scala is that integer, this would ensure that if there are at least as many partitions as there are unique values, no partition has multiple distinct partitioning key values.
That said, coming up with the assignment of values to such Int
s is inherently not parallelizable and requires either a sequential scan or knowing the distinct values ahead of time.
Probabilistically, the chance that a particular value hashes into a particular partition of (n partitions) is 1/n. As n increases relative to the number of distinct values, the chance of no partition having more than one distinct value increases (at the limit, if you could have 2^32 partitions, nearly all of them would be empty but an actual hash collision would still guarantee multiple distinct values in a partition). So if you can tolerate empty partitions, choosing a number of partitions that's sufficiently greater than the number of distinct values would reduce the chance of a sub-ideal result.
Upvotes: 4
Reputation: 1712
By any chance,does your coln X contains null values? Then Spark tries to create one partition for this. Since you are also giving the number of partitions as int, may be Spark tries to squish X2 and X3. So you can try two things - just give the column name for reparationing (still one partition extra) or try removing the null values from X, if they exist.
Upvotes: 0