Reputation: 1623
I am new to Spark. If I have a RDD consists of key-value pairs, what is the efficient way to return a subset of this RDD containing the keys that appear more than a certain times in the original RDD?
For example, if my original data RDD is like this:
val dataRDD=sc.parallelize(List((1,34),(5,3),(1,64),(3,67),(5,0)),3)
I want to get a new RDD, in which the keys appear more than once in dataRDD. The newRDD should contains these tuples: (1,34),(5,3),(1,64),(5,0). How can I get this new RDD? Thank you very much.
Upvotes: 3
Views: 1880
Reputation: 330353
Count keys and filter infrequent:
val counts = dataRDD.keys.map((_, 1)).reduceByKey(_ + _)
val infrequent = counts.filter(_._2 == 1)
If number of infrequent values is to large to be handled in memory you can use PairRDDFunctions.subtractByKey
:
dataRDD.subtractByKey(infrequent)
otherwise a broadcast variable:
val infrequentKeysBd = sc.broadcast(infrequent.keys.collect.toSet)
dataRDD.filter{ case(k, _) => !infrequentKeysBd.value.contains(k)}
If number of frequent keys is very low you can filter frequent keys and use a broadcast variable as above:
val frequent = counts.filter(_._2 > 1)
val frequentKeysBd = ??? // As before
dataRDD.filter{case(k, _) => frequentKeysBd.value.contains(k)}
Upvotes: 4