denairPete
denairPete

Reputation: 21

How to group by a select number of fields in an RDD looking for duplicates based on those fields

I am new to Scala and Spark. I am working in the Spark Shell.
I need to Group By and sort by the first three fields of this file, looking for duplicates. If I find duplicates within the group, I need to append a counter to the third field, starting at "1" and incrementing by "1", for each record in the duplicate group. Resetting the counter back to "1" when reading a new group. When no duplicates are found, then just append the counter which would be "1".

CSV File contains the following:
("00111","00111651","4444","PY","MA")
("00111","00111651","4444","XX","MA")
("00112","00112P11","5555","TA","MA")

val csv = sc.textFile("file.csv")
val recs = csv.map(line => line.split(",")

If I apply the logic properly on the above example, the resulting RDD of recs would look like this:
("00111","00111651","44441","PY","MA")
("00111","00111651","44442","XX","MA")
("00112","00112P11","55551","TA","MA")

Upvotes: 1

Views: 956

Answers (2)

The Archetypal Paul
The Archetypal Paul

Reputation: 41769

Also grouping the data, changing it, putting it back.

 val lists= List(("00111","00111651","4444","PY","MA"),
("00111","00111651","4444","XX","MA"),
("00112","00112P11","5555","TA","MA"))

val grouped = lists.groupBy{case(a,b,c,d,e) => (a,b,c)}
val indexed = grouped.mapValues(
               _.zipWithIndex
                .map {case ((a,b,c,d,e), idx) => (a,b,c + (idx+1).toString,d,e)}

val unwrapped = indexed.flatMap(_._2) 
//List((00112,00112P11,55551,TA,MA),
//     (00111,00111651,44442,XX ,MA),
//     (00111,00111651,44441,PY,MA))

Version working on Arrays (of arbitary length >= 3)

val lists= List(Array("00111","00111651","4444","PY","MA"),
Array("00111","00111651","4444","XX","MA"),
Array("00112","00112P11","5555","TA","MA"))
val grouped = lists.groupBy{_.take(3)} 
val indexed = grouped.mapValues(
      _.zipWithIndex
       .map {case (Array(a,b,c, rest@_*), idx) => Array(a,b,c+ (idx+1).toString) ++ rest})

val unwrapped = indexed.flatMap(_._2)  
// List(Array(00112, 00112P11, 55551, TA, MA),
//      Array(00111, 00111651, 44441, XX, MA),
//      Array(00111, 00111651, 44441, PY, MA))

Upvotes: 2

Justin Pihony
Justin Pihony

Reputation: 67115

How about group the data, change it and put it back:

val csv = sc.parallelize(List(
  "00111,00111651,4444,PY,MA",
  "00111,00111651,4444,XX,MA",
  "00112,00112P11,5555,TA,MA"
))
val recs = csv.map(_.split(","))
val grouped = recs.groupBy(line=>(line(0),line(1), line(2)))
val numbered = grouped.mapValues(dataList=>
      dataList.zipWithIndex.map{case(data, idx) => data match {
          case Array(fst,scd,thd,rest@_*) => Array(fst,scd,thd+(idx+1)) ++ rest
      }
    })
numbered.flatMap{case (key, values)=>values}

Upvotes: 3

Related Questions