Reputation: 827
I am trying to do secondary sort with Scala Spark, following this blog. And I have the following code:
case class TokenZidKey(token: String, zid: Long)
object TokenZidKey {
implicit def orderingByTokenZid[A <: TokenZidKey]: Ordering[A] = {
Ordering.by(tzk => (tzk.token, tzk.zid))
}
}
class TokenZidPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = partitions;
override def getPartition(key: Any): Int = key.asInstanceOf[TokenZidKey].token.hashCode() % numPartitions;
}
object SetSim {
def main(args: Array[String]) {
...
val linesWithZid = dataLines.zipWithIndex();
def mapZidByToken(r: (String, Long)): Iterable[(TokenZidKey, String)] = {
for (t <- r._1.split(" ")) yield (new TokenZidKey(t, r._2), r._1);
}
val tokensWithZid = linesWithZid.flatMap(mapZidByToken);
val tokenZidPartitioner = new TokenZidPartitioner(tokensWithZid.context.defaultParallelism);
val tokenZidsWithSecondarySort = tokensWithZid.repartitionAndSortWithinPartitions(tokenZidPartitioner).map(r => {
(r._1.token, (r._1.zid, r._2))
}).toDF("token", "zid_with_line");
}
}
where tokensWithZid
is org.apache.spark.rdd.RDD[(TokenZidKey, String)]
, but I still got
value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[(TokenZidKey, String)]
Any idea why and how I fix this? Is it that my custom TokenZidKey
is not recognized as a proper RDD key that is sortable?
Upvotes: 0
Views: 204
Reputation: 827
Turns out that the code works, the issue is I tested it in spark-shell and when input on different lines, spark-shell does not register the case class and the companion object as on the same file.
So if you want to test this in spark-shell, enter the case class and companion object in a single line like the following
case class TokenZidKey(token: String, zid: Long); object TokenZidKey {
implicit def orderingByTokenZid[A <: TokenZidKey]: Ordering[A] = {
Ordering.by(tzk => (tzk.token, tzk.zid))
}
}
Upvotes: 1