Jeremy
Jeremy

Reputation: 587

Flatten an RDD - Nested lists in value of key value pair

It took me awhile to figure this out and I wanted to share my solution. Improvements are definitely welcome.

References: Flattening a Scala Map in an RDD, Spark Flatten Seq by reversing groupby, (i.e. repeat header for each sequence in it)

I have an RDD of the form: RDD[(Int, List[(String, List[(String, Int, Float)])])]

Key: Int

Value: List[(String, List[(String, Int, Float)])]

With a goal of flattening to: RDD[(Int, String, String, Int, Float)]

binHostCountByDate.foreach(println)

Gives the example:

(516361, List((2013-07-15, List((s2.rf.ru,1,0.5), (s1.rf.ru,1,0.5))), (2013-08-15, List((p.secure.com,1,1.0)))))

The final RDD should give the following

(516361,2013-07-15,s2.rf.ru,1,0.5)
(516361,2013-07-15,s1.rf.ru,1,0.5)
(516361,2013-08-15,p.secure.com,1,1.0)

Upvotes: 0

Views: 2006

Answers (2)

The Archetypal Paul
The Archetypal Paul

Reputation: 41769

It's a simple one-liner (and with destructuring in the for-comprehension we can use better names than _1, _2._1 etc which makes it easier to be sure we're getting the right result

// Use a outer list in place of an RDD for test purposes
val t = List((516361,
                 List(("2013-07-15", List(("s2.rf.ru,",1,0.5), ("s1.rf.ru",1,0.5))),
                      ("2013-08-15", List(("p.secure.com,",1,1.0))))))

t flatMap {case (k, xs) => for ((d, ys) <- xs; (dom, a,b) <-ys) yield (k, d, dom, a, b)}
   //> res0: List[(Int, String, String, Int, Double)] = 
       List((516361,2013-07-15,s2.rf.ru,,1,0.5),
            (516361,2013-07-15,s1.rf.ru,1,0.5),
            (516361,2013-08-15,p.secure.com,,1,1.0))

Upvotes: 2

Jeremy
Jeremy

Reputation: 587

My approach is as follows:

I flatten the first key value pair. This "removes" the first list.

val binHostCountForDate = binHostCountByDate.flatMapValues(identity)

Gives me an RDD of the form: RDD[(Int, (String, List[(String, Int, Float)])]

binHostCountForDate.foreach(println)

(516361,(2013-07-15,List((s2.rf.ru,1,0.5), (s1.rf.ru,1,0.5))))
(516361,(2013-08-15,List(p.secure.com,1,1.0))

Now I map the first two items into a tuple creating a new key and the second tuple as the value. Then apply the same procedure as above to flatten on the new key value pair.

val binDataRemapKey = binHostCountForDate.map(f =>((f._1, f._2._1), f._2._2)).flatMapValues(identity)

This gives the flattened RDD: RDD[(Int, String),(String, Int, Float)]

If this form is fine then we are done but we can go one step further and remove the tuples to get the final form we were originally looking for.

val binData = binDataRemapKey.map(f => (f._1._1, f._1._2, f._2._1, f._2._2, f._2._3))

This gives us the final form of: RDD[(Int, String, String, Int, Float)]

We now have a flattened RDD that has preserved the parents of each list.

Upvotes: 0

Related Questions