MetallicPriest
MetallicPriest

Reputation: 30841

How to use mapPartitions in Scala?

I am trying to use mapPartitions in Scala but getting the following error.

[error]  found   : Unit
[error]  required: Iterator[?]
[error] Error occurred in an application involving default arguments.
[error]         rdd.mapPartitions(showParts)

I call the mapPartitions function as follows.

rdd.mapPartitions(showParts)

Where showParts function is defined as follows.

def showParts(iter: Iterator[(Long, Array[String])]) = 
{ 
  while (iter.hasNext)
  {
    val cur = iter.next;
    // Do something with cur
  }
}

What is the proper way of using mapPartitions here?

Upvotes: 4

Views: 4723

Answers (2)

Peter Neyens
Peter Neyens

Reputation: 9820

You need to return an Iterator from your showParts function.

def onlyEven(numbers: Iterator[Int]) : Iterator[Int] = 
  numbers.filter(_ % 2 == 0)

def partitionSize(numbers: Iterator[Int]) : Iterator[Int] = 
  Iterator.single(numbers.length)

val rdd = sc.parallelize(0 to 10)
rdd.mapPartitions(onlyEven).collect()
// Array[Int] = Array(0, 2, 4, 6, 8, 10)

rdd.mapPartitions(size).collect()
// Array[Int] = Array(2, 3, 3, 3)

Upvotes: 1

Till Rohrmann
Till Rohrmann

Reputation: 13356

The problem is that the UDF you pass to mapPartitions has to have a return type of Iterator[U]. Your current code does not return anything and thus is of type Unit.

If you want to obtain an empty RDD after performing the mapPartitions then you can do the following:

def showParts(iter: Iterator[(Long, Array[String])]) = 
{ 
  while (iter.hasNext)
  {
    val cur = iter.next;
    // Do something with cur
  }

  // return Iterator[U]
  Iterator.empty 
}

Upvotes: 3

Related Questions