Reputation: 516
I'm joining some DataFrames together in Spark and I keep getting the following error:
PartitioningCollection requires all of its partitionings have the same numPartitions.
It seems to happen after I join two DataFrames together that each seem fairly reasonable on their own, but after joining them, if I try to get a row from the joined DataFrame, I get this error. I am really just trying to understand why this error might be appearing or what the meaning behind it is as I can't seem to find any documentation on it.
The following invocation results in this exception:
val resultDataframe = dataFrame1
.join(dataFrame2,
$"first_column" === $"second_column").take(2)
but I can certainly call
dataFrame1.take(2)
and
dataFrame2.take(2)
I also tried repartitioning the DataFrames
, using Dataset.repartition(numPartitions)
or Dataset.coalesce(numParitions)
on dataFrame1
and dataFrame2
before joining, and on resultDataFrame
after the join, but nothing seemed to have affected the error. I haven't been able to find reference to other individuals getting the error after some cursory googling...
Upvotes: 9
Views: 1966
Reputation: 1097
Do you call the cache method?
This problem happens to me only when I use cache method. If I don't call this method I can use the data without any problem.
Upvotes: 3
Reputation: 1443
I've also had the same problem. For me it occurred after removing some columns from the select part of a join (not the join clause itself).
I was able to fix it by calling .repartition()
on the dataframe.
Upvotes: 9
Reputation: 121
I have encountered the same issue in the last few days, and I was disappointed when I found no references on the internet. Until yours!
A couple of things I would add: I get the error after a pretty complicated set of operations on dataframes (multiple joins). Also, these operations involve dataframes that are generated from the same parent dataframe. I'm trying to have a minimal example to replicate it, but it's not trivial to extract it from my pipeline.
I suspect Spark might be having troubles in computing a correct plan when the DAG gets too complicated. Unfortunately, it seems that, if it is a bug in Spark 2.0.0, the nightly builds have not fixed it yet (I've tried a 2.0.2 snapshot a couple of days ago).
A practical solution that fixes the issue (temporarily) seems to be: write to disk (at some point) some of your dataframes in your pipeline, and read them again. This effectively forces Spark to have a much smaller, more manageable plan to optimize, and well, it doesn't crash anymore. Of course it's just a temporary fix.
Upvotes: 9