Reputation: 233
I have below Dataframe and I want to flatten using only RDD. Can any one help?
Input DataFrame:
+---------+-------------+-----------------+-----+----------------+------------------------------------------------------+ |TPNB |unitOfMeasure|locationReference|types|types |effectiveDateTime | +---------+-------------+-----------------+-----+----------------+------------------------------------------------------+ |079562193|EA |0810 |STORE|[SELLABLE, HELD]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]| +---------+-------------+-----------------+-----+----------------+------------------------------------------------------+
Output:
TPNB unitOfMeasure locationReference types types effectiveDateTime 079562193 EA 0810 STORE SELLABLE 2015-10-09T00:55:23.6345Z 079562193 EA 0810 STORE HELD 2015-10-09T00:55:23.6345Z
i was trying something like this, which doesnt seems to be working.
final_output.map(value=>((value(0),value(1),value(2),value(3)),value(5),value(6) )).map{ case(key,value)=>value.map(records=>(key,records)) }
Upvotes: 0
Views: 309
Reputation: 23109
This is what you are looking for only on RDD. Convert row 5th and 6th as a Map and create a row for each.
import spark.implicits._
val data = spark.sparkContext
.parallelize(
Seq(
("079562193",
"EA",
"0810",
"STORE",
Array("SELLABLE", "HELD"),
Array("2015-10-09T00:55:23.6345Z", "2015-10-09T00:55:23.6345Z"))
))
val result = data
.map(row => (row._1, row._2, row._3, row._4, (row._5.zip(row._6).toMap)))
.map(r => {
r._5.map(v => (r._1, r._2, r._3, r._4, v._1, v._2))
})
.collect()
.foreach(println)
((079562193,EA,0810,STORE,SELLABLE,2015-10-09T00:55:23.6345Z)
(079562193,EA,0810,STORE,HELD,2015-10-09T00:55:23.6345Z))
Upvotes: 1
Reputation: 22449
To transform using RDD only functions, you can do something similar to the following after converting your dataframe to RDD (e.g. via df.rdd
):
val rdd = sc.parallelize(Seq(
("079562193", "EA", "0810", "STORE", List("SELLABLE", "HELD"), List("2015-10-09T00:55:23.6345Z", "2015-10-09T00:55:23.6345Z"))
)).
map{ case (t, u, l, y, ts, ds) => ((t, u, l, y), (ts, ds)) }.
flatMapValues{ case (x, y) => x zip y }.
map{ case ((t, u, l, y), (ts, ds)) => Seq(t, u, l, y, ts, ds) }
rdd.collect.foreach(println)
List(079562193, EA, 0810, STORE, SELLABLE, 2015-10-09T00:55:23.6345Z)
List(079562193, EA, 0810, STORE, HELD, 2015-10-09T00:55:23.6345Z)
Upvotes: 1