Reputation: 12710
spark foreachPartition
, how to get an index of the partition (or sequence number, or something to identify the partition)?
val docs: RDD[String] = ...
println("num partitions: " + docs.getNumPartitions)
docs.foreachPartition((it: Iterator[String]) => {
println("partition index: " + ???)
it.foreach(...)
})
Upvotes: 9
Views: 9446
Reputation: 149538
Not exactly identical, but you can use RDD.mapPartitionsWithIndex
and return an Iterator[Unit]
as a result:
val rdd: RDD[Unit] = docs.mapPartitionsWithIndex { case (idx, it) =>
println("partition index: " + ???)
it.foreach(...)
}
But then you have to remember to materialize the RDD
An alternative would be to use mapPartitionsWithIndex
to do logic related to transforming the data, and then using foreachRDD
just to send the data externally.
Upvotes: 6
Reputation: 35229
You can use TaskContext
(How to get ID of a map task in Spark?):
import org.apache.spark.TaskContext
rdd.foreachPartition((it: Iterator[String]) => {
println(TaskContext.getPartitionId)
})
Upvotes: 17