David Portabella
David Portabella

Reputation: 12710

spark foreachPartition, how to get an index of each partition?

spark foreachPartition, how to get an index of the partition (or sequence number, or something to identify the partition)?

val docs: RDD[String] = ...

println("num partitions: " + docs.getNumPartitions)

docs.foreachPartition((it: Iterator[String]) => {
  println("partition index: " + ???)
  it.foreach(...)
})

Upvotes: 9

Views: 9446

Answers (2)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

Not exactly identical, but you can use RDD.mapPartitionsWithIndex and return an Iterator[Unit] as a result:

val rdd: RDD[Unit] = docs.mapPartitionsWithIndex { case (idx, it) => 
  println("partition index: " + ???)
  it.foreach(...)
}

But then you have to remember to materialize the RDD

An alternative would be to use mapPartitionsWithIndex to do logic related to transforming the data, and then using foreachRDD just to send the data externally.

Upvotes: 6

Alper t. Turker
Alper t. Turker

Reputation: 35229

You can use TaskContext (How to get ID of a map task in Spark?):

import org.apache.spark.TaskContext

rdd.foreachPartition((it: Iterator[String]) => {
  println(TaskContext.getPartitionId)
})

Upvotes: 17

Related Questions