diens
diens

Reputation: 659

Scala Spark: Duplicating an Iterator

I'm using the following code but I've read that iterator.duplicate is an expensive operation, isn't? Is there a way to use another thing in my code to make it better in performance?

The myRdd is RDD[LabeledPoint]. The thing is to try to transform from the RDD[LabeledPoint] to a RDD[(String, Labelepoint)]:

var result = List[(String, Labelepoint)]()
var size = 0
var data: LabeledPoint = null
var op = ""
myRdd.mapPartitionsWithIndex(
  { (partID, iterator) => {
    val (iterator1, iterator2) = iterator.duplicate
    size = iterator1.length - 1
    while (iterator2.hasNext) {
      data = iterator2.next
      op = partID + "," + size
      result ::= (op, data)
      size = size - 1
    }
    result.iterator
  }
  }, preservesPartitioning = true)

Upvotes: 1

Views: 381

Answers (1)

Brian McCutchon
Brian McCutchon

Reputation: 8584

There are two reasons that Iterator.duplicate is expensive. The first is stated in the docs:

The implementation may allocate temporary storage for elements iterated by one iterator but not yet by the other.

Indeed, since you call iterator1.length at the start, it will need to store all of the elements in memory. This could cause problems if the iterator is extremely large and you're tight on memory.

The second is mentioned by Daniel C. Sobral in the comments here, and it is that the iterators it creates are synchronized, which slows them down. This can be solved by just using iterator.toSeq instead of iterator.duplicate, since you're already storing all the elements in memory anyway. Here's an example:

myRdd.mapPartitionsWithIndex(
    (partID, iterator) => iterator.toSeq.reverse.zipWithIndex.map {
      case (data, i) => (partID + "," + i, data)
    }.iterator,
    preservesPartitioning = true)

If you want to use less memory, that's a bit harder. You're also creating the list result that will have the same size, so you would need to fix that too. I don't think it's possible to get the exact behavior you have (including reversing the iterator) without storing all of the elements in memory. That said, if you're okay with having your result and your numbering be in the reverse order from that given in your code, this works with constant memory usage:

myRdd.mapPartitionsWithIndex(
    (partID, iterator) => iterator.zipWithIndex.map {
      case (data, i) => (partID + "," + i, data)
    },
    preservesPartitioning = true)

Upvotes: 4

Related Questions