Reputation: 1511
I have a RDD with around 7M entries with 10 normalized coordinates in each. I also have a number of centers and I'm trying to map every entry to the closest (Euclidean distance) center. The problem is that this only generates one task which means it is not parallelizing. This is the form:
def doSomething(point,centers):
for center in centers.value:
if(distance(point,center)<1):
return(center)
return(None)
preppedData.map(lambda x:doSomething(x,centers)).take(5)
The preppedData RDD is cached and already evaluated, the doSomething function is represented a lot easier than it actually is but it's the same principle. The centers is a list that has been broadcast. Why is this map only in one task?
Similar pieces of code in other projects just map to +- 100 tasks and get run on all the executors, this one is 1 task on 1 executor. My job has 8 executors with 8 GB and 2 cores per executor available.
Upvotes: 1
Views: 2980
Reputation: 737
This could be due to the conservative nature of the take() method. See the code in RDD.scala.
What it does is first take the first partition of your RDD (if your RDD doesn't require a shuffle, this will require only one task) and if there are enough results in that one partition, it will return that. If there is not enough data in your partition, it will then grow the number of partitions it tries to take until it gets the required number of elements.
Since your RDD is already cached, and your operation is only a map function, as long as any of your RDDs have >5 rows, this will only ever require one task. More tasks would be unnecessary.
This code exists to avoid overloading the driver with too much data by fetching from all partitions at once for a small take.
Upvotes: 3