Reputation: 3033
It's quite unfortunate that take
on RDD
is a strict operation instead of lazy but I won't get into why I think that's a regrettable design here and now.
My question is whether this is a suitable implementation of a lazy take
for RDD
. It seems to work, but I might be missing some non-obvious problem with it.
def takeRDD[T: scala.reflect.ClassTag](rdd: RDD[T], num: Long): RDD[T] =
new RDD[T](rdd.context, List(new OneToOneDependency(rdd))) {
// An unfortunate consequence of the way the RDD AST is designed
var doneSoFar = 0L
def isDone = doneSoFar >= num
override def getPartitions: Array[Partition] = rdd.partitions
// Should I do this? Doesn't look like I need to
// override val partitioner = self.partitioner
override def compute(split: Partition, ctx: TaskContext): Iterator[T] = new Iterator[T] {
val inner = rdd.compute(split, ctx)
override def hasNext: Boolean = !isDone && inner.hasNext
override def next: T = {
doneSoFar += 1
inner.next
}
}
}
Upvotes: 3
Views: 131
Reputation: 3725
No, this doesn't work. There's no way to have a variable which can be seen and updated concurrently across a Spark cluster, and that's exactly what you're trying to use doneSoFar
as. If you try this, then when you run compute
(in parallel across many nodes), you
a) serialize the takeRDD in the task, because you reference the class variable doneSoFar
. This means that you write the class to bytes and make a new instance in each JVM (executor)
b) update doneSoFar
in compute, which updates the local instance on each executor JVM. You'll take a number of elements from each partition equal to num
.
It's possible this will work in Spark local mode due to some of the JVM properties there, but it CERTAINLY will not work when running Spark in cluster mode.
take
is an action, not transformationRDDs are distributed, and so subsetting to an exact number of elements is an inefficient operation -- it can't be done totally in parallel, since each shard needs information about the other shards (like whether it should be computed at all). Take is great for bringing distributed data back into local memory.
rdd.sample
is a similar operation that stays in the distributed world, and can be run in parallel easily.
Upvotes: 3