jedesah
jedesah

Reputation: 3033

Is this a suitable way to implement a lazy `take` on RDD?

It's quite unfortunate that take on RDD is a strict operation instead of lazy but I won't get into why I think that's a regrettable design here and now.

My question is whether this is a suitable implementation of a lazy take for RDD. It seems to work, but I might be missing some non-obvious problem with it.

def takeRDD[T: scala.reflect.ClassTag](rdd: RDD[T], num: Long): RDD[T] =
  new RDD[T](rdd.context, List(new OneToOneDependency(rdd))) {
    // An unfortunate consequence of the way the RDD AST is designed
    var doneSoFar = 0L

    def isDone = doneSoFar >= num

    override def getPartitions: Array[Partition] = rdd.partitions

    // Should I do this? Doesn't look like I need to
    // override val partitioner = self.partitioner

    override def compute(split: Partition, ctx: TaskContext): Iterator[T] = new Iterator[T] {
      val inner = rdd.compute(split, ctx)

      override def hasNext: Boolean = !isDone && inner.hasNext

      override def next: T = {
        doneSoFar += 1
        inner.next
      }
    }
  }

Upvotes: 3

Views: 131

Answers (1)

Tim
Tim

Reputation: 3725

Answer to your question

No, this doesn't work. There's no way to have a variable which can be seen and updated concurrently across a Spark cluster, and that's exactly what you're trying to use doneSoFar as. If you try this, then when you run compute (in parallel across many nodes), you

a) serialize the takeRDD in the task, because you reference the class variable doneSoFar. This means that you write the class to bytes and make a new instance in each JVM (executor)

b) update doneSoFar in compute, which updates the local instance on each executor JVM. You'll take a number of elements from each partition equal to num.

It's possible this will work in Spark local mode due to some of the JVM properties there, but it CERTAINLY will not work when running Spark in cluster mode.

Why take is an action, not transformation

RDDs are distributed, and so subsetting to an exact number of elements is an inefficient operation -- it can't be done totally in parallel, since each shard needs information about the other shards (like whether it should be computed at all). Take is great for bringing distributed data back into local memory.

rdd.sample is a similar operation that stays in the distributed world, and can be run in parallel easily.

Upvotes: 3

Related Questions