Tusharjain93
Tusharjain93

Reputation: 131

Scala - Spark Repartition not giving expected results

I want to repartition my spark dataframe based on a column X. Say X column has 3 distinct values(X1,X2,X3). The number of distinct values could be varying.

I want one partition to contain records with only 1 value of X . ie. I want 3 partitions with 1 having records where X=X1 , other with X=X2 and last with X=X3.

I have unique walues of X from dataframe by query

val uniqueList = DF.select("X").distinct().map(x => x(0).toString).collect() 

which is giving list of unique values correctly.

And to repartition I am doing

DF = DF.repartition(uniqueList.length, col('X'))

However, my partitions in DF are not coming as expected. Data is not distributed correctly as one partition is empty, second contains records with X1 and third partition has records with both X2 and X3.

Can someone please help if I am missing something.

EDIT:

My column X could have varying number of unique values. It could have 3 or 3000 unique values. If I do below

DF = DF.repartition(col('X'))

I will only get 200 partitions, as that is the default value of spark.sql.shuffle.partitions. Thus I am giving number of partition

If there are 3000 unique values of X then I want to repartition my DF in such a way that there are 3000 partitions and each partition contains records for one particular value of X. So that I can run mapPartition and process each partition parallel.

Upvotes: 2

Views: 2176

Answers (3)

Powers
Powers

Reputation: 19308

Does this work?

val repartitionedDF = DF.repartition(col("X"))

Here's an example I blogged about

Data:

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

Code:

df
  .repartition(col("country"))
  .write
  .partitionBy("country")
  .parquet(outputPath)

Filesystem output:

partitioned_lake1/
  country=Argentina/
    part-00044-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=China/
    part-00059-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet
  country=Russia/
    part-00002-cf737804-90ea-4c37-94f8-9aa016f6953a.c000.snappy.parquet

Upvotes: 0

Levi Ramsey
Levi Ramsey

Reputation: 20551

Repartitioning is based on hash partitioning (take the hash code of the partitioning key modulo the number of partitions), so whether each partition only has one value is purely chance.

If you can map each partitioning key to a unique Int in the range of zero to (number of unique values - 1), since the hash code of an Int in Scala is that integer, this would ensure that if there are at least as many partitions as there are unique values, no partition has multiple distinct partitioning key values.

That said, coming up with the assignment of values to such Ints is inherently not parallelizable and requires either a sequential scan or knowing the distinct values ahead of time.

Probabilistically, the chance that a particular value hashes into a particular partition of (n partitions) is 1/n. As n increases relative to the number of distinct values, the chance of no partition having more than one distinct value increases (at the limit, if you could have 2^32 partitions, nearly all of them would be empty but an actual hash collision would still guarantee multiple distinct values in a partition). So if you can tolerate empty partitions, choosing a number of partitions that's sufficiently greater than the number of distinct values would reduce the chance of a sub-ideal result.

Upvotes: 4

Raghu
Raghu

Reputation: 1712

By any chance,does your coln X contains null values? Then Spark tries to create one partition for this. Since you are also giving the number of partitions as int, may be Spark tries to squish X2 and X3. So you can try two things - just give the column name for reparationing (still one partition extra) or try removing the null values from X, if they exist.

Upvotes: 0

Related Questions