Shams Tabraiz Alam
Shams Tabraiz Alam

Reputation: 111

Filtering RDD and List in Scala

I have an RDD with following structure

         RDD[((String), List[(Int,String)])] 

And it contain this data

        ((John),List((4,00A), (5,00A), (15,00B), (15,00C)))
        ((Billing),List((7,00A)))
        ((Root),List((1,00A), (2,00B), (3,00C)))
        ((Marsh),List((2,00B), (3,00C)))

Now i want to filter it by following rules

1 : If list does not contain '00A' then do not return it

2 : If list contain '00A' then return all '00A' items and also '00C' items in the list. So result should look like this.

      ((John),List((4,00A), (5,00A), (15,00C)))
      ((Billing),List((7,00A)))
      ((Root),List((1,00A), (3,00C)))

EDIT: Adding code posted in comment.

I have tried this:

val rdd = df
  .rdd
  .map{case Row(id: Int, name: String, code: String) => ((name), List((id, code)))}
  .reduceByKey(_ ++ _); 

val r = rdd
  .map{case (t, list) => { 
          val tempList = list.map{case (id, code) => (id, code)} 
          val newList = tempList.map{case (id, "00A") => (id, "00A")
                                     case (id, "00C") => (id, "00C")
                                     case (id, code) => List.empty } 

          (t, newList)}
   };

Upvotes: 1

Views: 428

Answers (1)

The Archetypal Paul
The Archetypal Paul

Reputation: 41749

It's straightforward

It's not really an RDD/Spark question so I've done it with Lists. If you were using lists, you could do the filter/map as one collect but collect means something quite different for RDD

Assuming your data was really meant to look like this:

 val xs = List(("John",List((4,"00A"), (5,"00A"), (15,"00B"), (15,"00C"))),
            ("Billing",List((7,"00A"))),
            ("Root",List((1,"00A"), (2,"00B"), (3,"00C"))),
            ("Marsh", List((2,"00B"), (3,"00C"))))

Then first we filter to get only the records with "00A" somewhere.

 val filtered = xs.filter{case (key, ys) =>ys.exists(y=>y._2 == "00A")}

Them map over tte result to return only the "00A" and "00C" ones

 val result = filtered.map{case (key, ys) => 
                             (key, ys.filter(y=>y._2 == "00A" || y._2 == "00C"))}
 //> result  : List[(String, List[(Int, String)])] = List(
 // (John,List((4,00A), (5, 00A), (15,00C))),
 // (Billing,List((7,00A))), 
 // (Root,List((1,00A), (3,00C))))

Upvotes: 1

Related Questions