K P
K P

Reputation: 851

Spark (streaming) RDD foreachPartitionAsync functionality/working

I will come to actual question but please bear with my use-case first. I have the following use-case, say I got rddStud from somewhere:

val rddStud: RDD[(String,Student)] = ???

Where 'String' - some random string and 'Student' - case class Student(name: String, id: String, arrivalTime: Long, classId: String)

I am using Student only as an example - actual business logic has much different complicated class with many fields.

What I want to achieve is - students with same id must be processed in ascending order of their arrivalTime.

For this here's what I am doing:

//Get RDD from Student.id -> Student
val studMapRdd: RDD[(String,Student)] = rddStud.map(tuple => { 
  val student = tuple._2
  (student.id,student)
})

//Make sure all students with same student.id are in same partition.
//I can potentially use groupByKey/combineByKey.... etc, but I don't see much performance difference    
val studPartitionRdd: RDD[(String,Student)] = studMapRdd.partitionBy(new HashPartitioner(studMapRdd.getNumPartitions))

val studSortedRdd: RDD[(String,Student)] = studPartitionRdd.sortBy({ case(studentId,student} => 
    student.arrivalTime
 }, ascending = true)

studSortedRdd.foreachPartition(itr =>{
    itr.foreach{ case (studentId, student) => { 
       val studentName = student.name
       val time = student.arrivalTime 
       //send for additional processing studentName and time combination
    }
})

My questions are:

  1. If I use foreachPartitionAsync - will it process all partitions parallely, but the elements in each partition in order? If not, what's the difference between foreachPartitionAsync and foreachAsync then?
  2. Does the approach of sorting after repartitioning seem reasonable? Or if you could suggest any optimizations in above logic?

Much appreciated.

Upvotes: 2

Views: 3303

Answers (1)

zero323
zero323

Reputation: 330413

Neither choice between synchronous (foreach(Partition)) and asynchronous (foreach(Partition)Async) submission nor choice between element-wise and partition-wise access will affect execution order. In the first case the important difference blocking vs non-blocking execution, in the second case the way in which data is exposed but actual execution mechanism is more or less the same.

Sorting after repartitioning is not a valid approach. sortBy will trigger full shuffle and won't preserve existing data distribution. If you want to preserve existing data layout you can either sort within subsequent mapPartitions phase or even better use repartitionAndSortWithinPartitions.

class StudentIdPartitioner[V](n: Int) extends org.apache.spark.Partitioner {
  def numPartitions: Int = n
  def getPartition(key: Any): Int = {
    val x = key.asInstanceOf[Student].id.hashCode % n
    x + (if (x < 0) n else 0)
  }
}

val rddStud: RDD[Student] = ???
val partitioner = new StudentIdPartitioner(rddStud.getNumPartitions)
val arrTimeOrdering = scala.math.Ordering.by[Student, Long](_.arrivalTime)


{
  implicit val ord = arrTimeOrdering
  rddStud.map((_, null)).repartitionAndSortWithinPartitions(partitioner)
}

Upvotes: 2

Related Questions