DuFei
DuFei

Reputation: 447

why I cannot use foreach in mapPartitions in Spark

I created an array using SparkContext using 2 partitions, I also try to use mapPartition to handle the elements, however I've got a very strange error, when I code like this:

val masterURL = "local[*]"

val conf = new SparkConf().setAppName("KMeans Test").setMaster(masterURL)
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")

val data = sc.textFile("file:/d:/data/kmeans_data.txt")
val parsedData = data.mapPartitions(partition => parseData(partition)).cache()

parsedData.mapPartitions(points =>
  points.map(point =>
    println(point)
  )
)

It has no errors, however, when I replace map to foreach, it tips an error:

parsedData.mapPartitions(points =>
  points.foreach(point =>
   println(point)
  )
)

the error is as follows:

Type mismatch, expected: (Iterator[Vector]) => Iterator[NotInferedU], actual: (Iterator[Vector]) => Unit Expression of type Unit doesn't conform to expected type Iterator[U_]

In addition, the first code snip also do not print anything in console panel, why?

Upvotes: 1

Views: 2391

Answers (2)

Álvaro Valencia
Álvaro Valencia

Reputation: 1217

You're getting this error because the foreachmethod returns Unittype. The mapPartitions method cannot return that type. Try returning the same iterator you are receiving:

parsedData.mapPartitions(points =>
  points.foreach(point =>
   println(point)
  )
  points
)

It should work.

In addition, the first code snip also do not print anything in console panel, why?

Because the map function does not execute anything until an action is called, (for example, collect or foreach).

Upvotes: 3

Radhwane Chebaane
Radhwane Chebaane

Reputation: 864

mapPartitions expect a function that return a new iterator of partitions (Iterator[Vector] => Iterator[NotInferedU]), it maps an iterator to another iterator. By using foreach you return void (Unit in Scala) which is different from the expected return type.

For printing RDD content, you can use foreachPartition instead of mapPartitions:

parsedData.foreachPartition(points =>
  points.foreach(point =>
    println(point)
  )
)

Upvotes: 2

Related Questions