JetS79
JetS79

Reputation: 71

Sorting an RDD in Spark

I have a dataset listing general items bought by customers. Each record in the csv, lists items purchased by a customer, from left to right. For example (shortened sample):

Bicycle, Helmet, Gloves
Shoes, Jumper, Gloves
Television, Hat, Jumper, Playstation 5

I am looking to put this in an RDD in scala, and perform counts on them.

case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: String)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(line => line.split(",")).countByValue();

Above is a short code sample. The first line is the case class (not used yet). Line two grabs the data from the csv and puts it in an rdd_1. Easy enough. Line three does flatmap, splits the data on the comma, and then does a count on each. So, for example, "Gloves" and "Jumper" above would have the number 2 beside it. The others 1. In what looks like a collection of tuples. So far so good.

Next, I want to sort rdd_2 to list the top 3 most purchased items. Can I do this with RDD? Or do I need to transfer RDD into a dataframe to achieve sort? If so, how do I do it?

How do I apply the case class in line 1 for example to rdd_2, which seems to be a list of tuples? Should I take this approach?

Thanks in advance

Upvotes: 0

Views: 51

Answers (1)

mck
mck

Reputation: 42352

The count in the case class should be an integer... and if you want to keep the results as an RDD, I'd suggest using reduceByKey rather than countByValue which returns a Map[String, Long] rather than an RDD.

Also I'd suggest splitting by , rather than , to avoid leading spaces in the item names.

case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: Int)

val rdd_1 = sc.textFile("Data/SalesItems.csv")

val rdd_2 = rdd_1.flatMap(_.split(", "))
                 .map((_, 1))
                 .reduceByKey(_ + _)
                 .map(line => SalesItemSummary(line._1, line._2))

rdd_2.collect()
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Shoes,1), SalesItemSummary(Television,1), SalesItemSummary(Bicycle,1), SalesItemSummary(Helmet,1), SalesItemSummary(Hat,1), SalesItemSummary(Jumper,2), SalesItemSummary(Playstation 5,1))

To sort the RDD, you can use sortBy:

val top3 = rdd_2.sortBy(_.SalesItemCount, false).take(3)

top3
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Jumper,2), SalesItemSummary(Shoes,1))

Upvotes: 3

Related Questions