Reputation: 30841
I am trying to use mapPartitions
in Scala but getting the following error.
[error] found : Unit
[error] required: Iterator[?]
[error] Error occurred in an application involving default arguments.
[error] rdd.mapPartitions(showParts)
I call the mapPartitions
function as follows.
rdd.mapPartitions(showParts)
Where showParts
function is defined as follows.
def showParts(iter: Iterator[(Long, Array[String])]) =
{
while (iter.hasNext)
{
val cur = iter.next;
// Do something with cur
}
}
What is the proper way of using mapPartitions
here?
Upvotes: 4
Views: 4723
Reputation: 9820
You need to return an Iterator
from your showParts
function.
def onlyEven(numbers: Iterator[Int]) : Iterator[Int] =
numbers.filter(_ % 2 == 0)
def partitionSize(numbers: Iterator[Int]) : Iterator[Int] =
Iterator.single(numbers.length)
val rdd = sc.parallelize(0 to 10)
rdd.mapPartitions(onlyEven).collect()
// Array[Int] = Array(0, 2, 4, 6, 8, 10)
rdd.mapPartitions(size).collect()
// Array[Int] = Array(2, 3, 3, 3)
Upvotes: 1
Reputation: 13356
The problem is that the UDF you pass to mapPartitions
has to have a return type of Iterator[U]
. Your current code does not return anything and thus is of type Unit
.
If you want to obtain an empty RDD
after performing the mapPartitions
then you can do the following:
def showParts(iter: Iterator[(Long, Array[String])]) =
{
while (iter.hasNext)
{
val cur = iter.next;
// Do something with cur
}
// return Iterator[U]
Iterator.empty
}
Upvotes: 3