add-semi-colons
add-semi-colons

Reputation: 18810

scala-spark: How to filter RDD after groupby

I have started with a RDD that has pipe separated string. I have processed the data and gotten into following format:

((0001F46468,239394055),(7665710590658745,-414963169),0,1420276980302)
((0001F46468,239394055),(8016905020647641,183812619),1,1420347885727)
((0001F46468,239394055),(6633110906332136,294201185),1,1420398323110)
((0001F46468,239394055),(6633110906332136,294201185),0,1420451687525)
((0001F46468,239394055),(7722056727387069,1396896294),1,1420537469065)
((0001F46468,239394055),(7722056727387069,1396896294),1,1420623297340)
((0001F46468,239394055),(8045651092287275,-4814845),1,1420720722185)
((0001F46468,239394055),(5170029699836178,-1332814297),0,1420750531018)
((0001F46468,239394055),(7722056727387069,1396896294),0,1420807545137)
((0001F46468,239394055),(4784119468604853,1287554938),1,1421050087824) 

Just to give an high level view on description of the data. You can think first element in the main tuple (first tuple) as a user identification, second tuple as a product identification, and third element is user's preference on the product. (for future reference I am going to mark above data set as val userData)

My goal is that if user has casted both positive (1) and negative (0) preference for a product only take the record with positive. For example:

((0001F46468,239394055),(6633110906332136,294201185),1,1420398323110)
((0001F46468,239394055),(6633110906332136,294201185),0,1420451687525)

I only want to keep

((0001F46468,239394055),(6633110906332136,294201185),1,1420398323110) 

So I grouped the users by user-product tuple (0001F46468,239394055),(6633110906332136,294201185

val groupedFiltered = userData.groupBy(x => (x._1, x._2)).map(u => {
      for(k <- u._2) {
        if(k._3 > 0)
          u
      }
    })

But that return empty tuples.

So I took the following approach:

val groupedFiltered = userData. groupBy(x => (x._1, x._2)).flatMap(u => u._2).filter(m => m._3 > 0)

((47734739656882457,-1782798434),(7585453414177905,-461779195),1,1422013413082)
((47734739656882457,-1782798434),(7585453414177905,-461779195),1,1422533237758)
((55218449094787901,-1374432022),(6227831620534109,1195766703),1,1420410603596)
((71212122719822610,-807015489),(6769904840922490,1642054117),1,1422549467554)
((75414197560031509,1830213715),(6724015489416254,-1389654186),1,1420196951100)
((60422797294995441,734266951),(6335216393920738,1528026712),1,1421161253600)
((35091051395844216,451349158),(8135854751464083,-1751839326),1,1422083101033)
((16647193023519619,990937787),(5384884550662007,-910998857),1,1420659873572)
((43355867025936022,-945669937),(7336240855866885,518993644),1,1420880078266)
((12188366927481231,-2007889717),(5336507724485344,363519858),1,1420827788022)

This was promising but it looks like its taking all the records that has zero where I only want if the user has 1 and 0 for same item only keep the one with 1.

Upvotes: 4

Views: 15624

Answers (1)

Peter Neyens
Peter Neyens

Reputation: 9820

You could only keep the maximum user preference from the grouped results.

userData
 // group by user and product
 .groupBy(x => (x._1, x._2))
 // only keep the maximum user preference per user/product
 .mapValues(_.maxBy(_._3))
 // only keep the values
 .values

Upvotes: 7

Related Questions