Reputation: 40969
I am using Spark to create an "inverted index" that will map an English token back to the documentIds where the token is found. Given existing data of the form:
documentId1, token
documentId2, token
I would like to create an inverted index of the key, value form:
token, List(documentId1, documentId2, documentId3, ...)
where the value is a list of documentIds that are sorted and distinct (unique).
Here is what I have so far: :
// List of (documentId, token) pairs
var data = Array((100, "spark"), (50, "spark"), (50, "spark"), (1, "apache"), (3, "apache"), (2, "apache"))
var myrdd = sc.parallelize(data)
var myrddGrouped = myrdd.map(pair => (pair._2, pair._1)).groupByKey()
// myrddGrouped: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[226] at groupByKey at <console>:31
myrddGrouped.foreach(println)
// (apache,CompactBuffer(1, 3, 2))
// (spark,CompactBuffer(100, 50, 50))
As you can see, I am using groupByKey(), but the result value is a CompactBuffer, not a List. How can I apply "distinct" and "sort" to it?
Upvotes: 0
Views: 1537
Reputation: 17
var data = Array((100, "spark"), (50, "spark"), (50, "spark"), (1, "apache"), (3, "apache"), (2, "apache"))
var myrdd = sc.parallelize(data)
var myrddGrouped = myrdd.map(pair => (pair._2, pair._1)).groupByKey().mapValues(_.toSet.toList).collect
res141: Array[(String, List[Int])] = Array((spark,List(50, 100)), (apache,List(1, 2, 3)))
Upvotes: 0
Reputation: 2442
I would recommend aggregating into a set instead of using groupByKey
. This way duplicates will be eliminated during aggregation and then you can convert to some sort of List
and apply your sort.
Using some of the suggestions in the comments, perhaps something like the following should work:
val input = sc.parallelize(Array((100, "spark"), (50, "spark"), (50, "spark"), (1, "apache"), (3, "apache"), (2, "apache")))
val setRDD = input.map(_.swap).aggregateByKey(Set[Int]())(_ ++ Set(_), _ ++ _)
val sortedListRDD = setRDD.mapValues(_.toList.sorted)
** sortedListRDD.foreach(println)
** (spark,List(50, 100))
** (apache,List(1, 2, 3))
A small note is that you should declare your RDD's as val
since they are immutable. It is possible to reassign a var
with a new RDD but since you don't seem to be doing that, I would just use val
's.
Upvotes: 4