Reputation: 111
I have an RDD with following structure
RDD[((String), List[(Int,String)])]
And it contain this data
((John),List((4,00A), (5,00A), (15,00B), (15,00C)))
((Billing),List((7,00A)))
((Root),List((1,00A), (2,00B), (3,00C)))
((Marsh),List((2,00B), (3,00C)))
Now i want to filter it by following rules
1 : If list does not contain '00A' then do not return it
2 : If list contain '00A' then return all '00A' items and also '00C' items in the list. So result should look like this.
((John),List((4,00A), (5,00A), (15,00C)))
((Billing),List((7,00A)))
((Root),List((1,00A), (3,00C)))
EDIT: Adding code posted in comment.
I have tried this:
val rdd = df
.rdd
.map{case Row(id: Int, name: String, code: String) => ((name), List((id, code)))}
.reduceByKey(_ ++ _);
val r = rdd
.map{case (t, list) => {
val tempList = list.map{case (id, code) => (id, code)}
val newList = tempList.map{case (id, "00A") => (id, "00A")
case (id, "00C") => (id, "00C")
case (id, code) => List.empty }
(t, newList)}
};
Upvotes: 1
Views: 428
Reputation: 41749
It's straightforward
It's not really an RDD/Spark question so I've done it with Lists. If you were using lists, you could do the filter
/map
as one collect
but collect
means something quite different for RDD
Assuming your data was really meant to look like this:
val xs = List(("John",List((4,"00A"), (5,"00A"), (15,"00B"), (15,"00C"))),
("Billing",List((7,"00A"))),
("Root",List((1,"00A"), (2,"00B"), (3,"00C"))),
("Marsh", List((2,"00B"), (3,"00C"))))
Then first we filter to get only the records with "00A" somewhere.
val filtered = xs.filter{case (key, ys) =>ys.exists(y=>y._2 == "00A")}
Them map over tte result to return only the "00A" and "00C" ones
val result = filtered.map{case (key, ys) =>
(key, ys.filter(y=>y._2 == "00A" || y._2 == "00C"))}
//> result : List[(String, List[(Int, String)])] = List(
// (John,List((4,00A), (5, 00A), (15,00C))),
// (Billing,List((7,00A))),
// (Root,List((1,00A), (3,00C))))
Upvotes: 1