Reputation: 53
I have two RDD
s that wrap the following arrays:
Array((3,Ken), (5,Jonny), (4,Adam), (3,Ben), (6,Rhonda), (5,Johny))
Array((4,Rudy), (7,Micheal), (5,Peter), (5,Shawn), (5,Aaron), (7,Gilbert))
I need to design a code in such a way that if I provide input as 3 I need to return
Array((3,Ken), (3,Ben))
If input is 6, output should be
Array((6,Rhonda))
I tried something like this:
val list3 = list1.union(list2)
list3.reduceByKey(_+_).collect
list3.reduceByKey(6).collect
None of these worked, can anyone help me out with a solution for this problem?
Upvotes: 3
Views: 5771
Reputation: 12794
Given the following that you would have to define yourself
// Provide you SparkContext and inputs here
val sc: SparkContext = ???
val array1: Array[(Int, String)] = ???
val array2: Array[(Int, String)] = ???
val n: Int = ???
val rdd1 = sc.parallelize(array1)
val rdd2 = sc.parallelize(array2)
You can use the union
and filter
to reach your goal
rdd1.union(rdd2).filter(_._1 == n)
Since filtering by key is something that you would probably want to do in several occasions, it makes sense to encapsulate this functionality in its own function.
It would also be interesting if we could make sure that this function could work on any type of keys, not only Int
s.
You can express this in the old RDD
API as follows:
def filterByKey[K, V](rdd: RDD[(K, V)], k: K): RDD[(K, V)] =
rdd.filter(_._1 == k)
You can use it as follows:
val rdd = rdd1.union(rdd2)
val filtered = filterByKey(rdd, n)
Let's look at this method a little bit more in detail.
This method allows to filterByKey
and RDD
which contains a generic pair, where the type of the first item is K
and the type of the second type is V
(from key and value). It also accepts a key of type K
that will be used to filter the RDD
.
You then use the filter
function, that takes a predicate (a function that goes from some type - in this case K
- to a Boolean
) and makes sure that the resulting RDD
only contains items that respect this predicate.
We could also have written the body of the function as:
rdd.filter(pair => pair._1 == k)
or
rdd.filter { case (key, value) => key == k }
but we took advantage of the _
wildcard to express the fact that we want to act on the first (and only) parameter of this anonymous function.
To use it, you first parallelize
your RDD
s, call union
on them and then invoke the filterByKey
function with the number you want to filter by (as shown in the example).
Upvotes: 2