Reputation: 851
I will come to actual question but please bear with my use-case first. I have the following use-case, say I got rddStud
from somewhere:
val rddStud: RDD[(String,Student)] = ???
Where 'String' - some random string and 'Student' - case class Student(name: String, id: String, arrivalTime: Long, classId: String)
I am using Student only as an example - actual business logic has much different complicated class with many fields.
What I want to achieve is - students with same id
must be processed in ascending order of their arrivalTime
.
For this here's what I am doing:
//Get RDD from Student.id -> Student
val studMapRdd: RDD[(String,Student)] = rddStud.map(tuple => {
val student = tuple._2
(student.id,student)
})
//Make sure all students with same student.id are in same partition.
//I can potentially use groupByKey/combineByKey.... etc, but I don't see much performance difference
val studPartitionRdd: RDD[(String,Student)] = studMapRdd.partitionBy(new HashPartitioner(studMapRdd.getNumPartitions))
val studSortedRdd: RDD[(String,Student)] = studPartitionRdd.sortBy({ case(studentId,student} =>
student.arrivalTime
}, ascending = true)
studSortedRdd.foreachPartition(itr =>{
itr.foreach{ case (studentId, student) => {
val studentName = student.name
val time = student.arrivalTime
//send for additional processing studentName and time combination
}
})
My questions are:
Much appreciated.
Upvotes: 2
Views: 3303
Reputation: 330413
Neither choice between synchronous (foreach(Partition)
) and asynchronous (foreach(Partition)Async
) submission nor choice between element-wise and partition-wise access will affect execution order. In the first case the important difference blocking vs non-blocking execution, in the second case the way in which data is exposed but actual execution mechanism is more or less the same.
Sorting after repartitioning is not a valid approach. sortBy
will trigger full shuffle and won't preserve existing data distribution. If you want to preserve existing data layout you can either sort within subsequent mapPartitions
phase or even better use repartitionAndSortWithinPartitions
.
class StudentIdPartitioner[V](n: Int) extends org.apache.spark.Partitioner {
def numPartitions: Int = n
def getPartition(key: Any): Int = {
val x = key.asInstanceOf[Student].id.hashCode % n
x + (if (x < 0) n else 0)
}
}
val rddStud: RDD[Student] = ???
val partitioner = new StudentIdPartitioner(rddStud.getNumPartitions)
val arrTimeOrdering = scala.math.Ordering.by[Student, Long](_.arrivalTime)
{
implicit val ord = arrTimeOrdering
rddStud.map((_, null)).repartitionAndSortWithinPartitions(partitioner)
}
Upvotes: 2