Rohan Nayak
Rohan Nayak

Reputation: 233

Scala/Spark : Flattening the DataFrame using RDD only functions

I have below Dataframe and I want to flatten using only RDD. Can any one help?

Input DataFrame:

   +---------+-------------+-----------------+-----+----------------+------------------------------------------------------+
   |TPNB     |unitOfMeasure|locationReference|types|types           |effectiveDateTime                                     |
   +---------+-------------+-----------------+-----+----------------+------------------------------------------------------+
   |079562193|EA           |0810             |STORE|[SELLABLE, HELD]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]|
   +---------+-------------+-----------------+-----+----------------+------------------------------------------------------+

Output:

TPNB        unitOfMeasure   locationReference   types   types       effectiveDateTime
079562193   EA              0810                STORE   SELLABLE    2015-10-09T00:55:23.6345Z
079562193   EA              0810                STORE   HELD        2015-10-09T00:55:23.6345Z

i was trying something like this, which doesnt seems to be working.

    final_output.map(value=>((value(0),value(1),value(2),value(3)),value(5),value(6) )).map{
      case(key,value)=>value.map(records=>(key,records))
    }

Upvotes: 0

Views: 309

Answers (2)

koiralo
koiralo

Reputation: 23109

This is what you are looking for only on RDD. Convert row 5th and 6th as a Map and create a row for each.

  import spark.implicits._

  val data = spark.sparkContext
    .parallelize(
      Seq(
        ("079562193",
         "EA",
         "0810",
         "STORE",
         Array("SELLABLE", "HELD"),
         Array("2015-10-09T00:55:23.6345Z", "2015-10-09T00:55:23.6345Z"))
      ))

  val result = data
    .map(row => (row._1, row._2, row._3, row._4, (row._5.zip(row._6).toMap)))
    .map(r => {
      r._5.map(v => (r._1, r._2, r._3, r._4, v._1, v._2))
    })
    .collect()
    .foreach(println)

((079562193,EA,0810,STORE,SELLABLE,2015-10-09T00:55:23.6345Z)
(079562193,EA,0810,STORE,HELD,2015-10-09T00:55:23.6345Z))

Upvotes: 1

Leo C
Leo C

Reputation: 22449

To transform using RDD only functions, you can do something similar to the following after converting your dataframe to RDD (e.g. via df.rdd):

val rdd = sc.parallelize(Seq(
    ("079562193", "EA", "0810", "STORE", List("SELLABLE", "HELD"), List("2015-10-09T00:55:23.6345Z", "2015-10-09T00:55:23.6345Z"))
  )).
  map{ case (t, u, l, y, ts, ds) => ((t, u, l, y), (ts, ds)) }.
  flatMapValues{ case (x, y) => x zip y }.
  map{ case ((t, u, l, y), (ts, ds)) => Seq(t, u, l, y, ts, ds) }

rdd.collect.foreach(println)
List(079562193, EA, 0810, STORE, SELLABLE, 2015-10-09T00:55:23.6345Z)
List(079562193, EA, 0810, STORE, HELD, 2015-10-09T00:55:23.6345Z)

Upvotes: 1

Related Questions