3tbraden
3tbraden

Reputation: 827

repartitionAndSortWithinPartitions is not a member of RDD[(K, V)]

I am trying to do secondary sort with Scala Spark, following this blog. And I have the following code:

case class TokenZidKey(token: String, zid: Long)

object TokenZidKey {
    implicit def orderingByTokenZid[A <: TokenZidKey]: Ordering[A] = {
        Ordering.by(tzk => (tzk.token, tzk.zid))
    }
}

class TokenZidPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions;
    override def getPartition(key: Any): Int = key.asInstanceOf[TokenZidKey].token.hashCode() % numPartitions;
}

object SetSim {
    def main(args: Array[String]) {
        ...
        val linesWithZid = dataLines.zipWithIndex();
        def mapZidByToken(r: (String, Long)): Iterable[(TokenZidKey, String)] = {
            for (t <- r._1.split(" ")) yield (new TokenZidKey(t, r._2), r._1);
        }
        val tokensWithZid = linesWithZid.flatMap(mapZidByToken);

        val tokenZidPartitioner = new TokenZidPartitioner(tokensWithZid.context.defaultParallelism);
        val tokenZidsWithSecondarySort = tokensWithZid.repartitionAndSortWithinPartitions(tokenZidPartitioner).map(r => {
            (r._1.token, (r._1.zid, r._2))
        }).toDF("token", "zid_with_line");
    }
}

where tokensWithZid is org.apache.spark.rdd.RDD[(TokenZidKey, String)], but I still got

value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[(TokenZidKey, String)]

Any idea why and how I fix this? Is it that my custom TokenZidKey is not recognized as a proper RDD key that is sortable?

Upvotes: 0

Views: 204

Answers (1)

3tbraden
3tbraden

Reputation: 827

Turns out that the code works, the issue is I tested it in spark-shell and when input on different lines, spark-shell does not register the case class and the companion object as on the same file.

So if you want to test this in spark-shell, enter the case class and companion object in a single line like the following

case class TokenZidKey(token: String, zid: Long); object TokenZidKey {
    implicit def orderingByTokenZid[A <: TokenZidKey]: Ordering[A] = {
        Ordering.by(tzk => (tzk.token, tzk.zid))
    }
}

Upvotes: 1

Related Questions