Pari
Pari

Reputation: 1493

Secondary Sort using Apache Spark 1.6

I am referring web link http://codingjunkie.net/spark-secondary-sort/ to implement secondary sort in my spark job.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.Partitioner

package Ssort {
case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
      implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
       Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
    }
}

class DeviceKeyPartitioner(partitions: Int) extends Partitioner {
    require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[DeviceKey]
      k.serialNum.hashCode() % numPartitions
    }
}

object SparkApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Application").setMaster("local[2]")
    val sc = new SparkContext(conf)    
    val t = sc.parallelize(List(((DeviceKey("2","100",1),1)),(DeviceKey("2","100",3),1)), 1)
     t.repartitionAndSortWithinPartitions(partitioner)

  }
}
} 

I am getting error as - value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[(DeviceKey, Int)]

Can somebody have a look?

Thanks & Regards Pari

Upvotes: 1

Views: 253

Answers (1)

Pari
Pari

Reputation: 1493

This answer is shared by Yong Zhang through spark user email group.

ANSWER

The error message indeed is not very clear.

What you did wrong is that the repartitionAndSortWithinPartitions not only requires PairRDD, but also OrderedRDD. Your case class as key is NOT Ordered.

Either you extends it from Ordered, or provide a companion object to do the implicit Ordering.

scala> spark.version
res1: String = 2.1.0

scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) 
extends Ordered[DeviceKey] {
     |   import scala.math.Ordered.orderingToOrdered
     |   def compare(that: DeviceKey): Int = 
     |      (this.serialNum, this.eventDate, this.EventTs * -1) compare 
     |      (that.serialNum, that.eventDate, that.EventTs * -1)
     | }
defined class DeviceKey

scala>

scala> val t = sc.parallelize(List(((DeviceKey("2","100",1),1)),
(DeviceKey("2","100",3),1)), 1)
t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala>

scala> class DeviceKeyPartitioner(partitions: Int) extends org.apache.spark.Partitioner {
     |     require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
     |
     |     override def numPartitions: Int = partitions
     |
     |     override def getPartition(key: Any): Int = {
     |       val k = key.asInstanceOf[DeviceKey]
     |       k.serialNum.hashCode() % numPartitions
     |     }
     | }
defined class DeviceKeyPartitioner

scala>

scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2))
res0: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at repartitionAndSortWithinPartitions at <console>:30

Upvotes: 1

Related Questions