Reputation: 810
So I have an RDD which is RDD[ListBuffer[(Array[String], Long)]]
. For simplicity we can just call it RDD[X]
, where X
is a list of some variables. X
is a list of obj
objects, X[obj]
.
The idea is that I want a function which takes RDD[X]
as input and outputs a new RDD[X]
, a transformation. This transformation will create new X
lists by taking out obj
from one X
and creating a new one, and like "appending" it to the RDD.
I haven't found anything in Spark that supports this directly. Right now the only solution I can think of is by performing collect()
and managing most of this at the driver but this is obviously not good. Any ideas?
Basically something like this:
val data = RDD[ListBuffer[(Array[String], Long)]]
// some transformation that calls some function
// what will happen is some (Array[String], Long) will be moved into an entirely new ListBuffer in outData while some may be completely removed
val outData = RDD[ListBuffer[(Array[String], Long)]]
Lets say if we have a starting RDD that contains one ListBuffer of 7 elements:
Element1 (in ListBuffer1)
Element2 (in ListBuffer1)
Element3 (in ListBuffer1)
Element4 (in ListBuffer1)
Element5 (in ListBuffer1)
Element6 (in ListBuffer1)
Element7 (in ListBuffer1)
And after the transformation the RDD will have the following contents:
Element1 (in ListBuffer1)
Element2 (in ListBuffer1)
Element4 (in ListBuffer2)
Element5 (in ListBuffer2)
Element6 (in ListBuffer2)
Some elements have been moved to a new ListBuffer in the RDD while two elements were completely removed.
I am using Spark 1.6.0.
Upvotes: 0
Views: 111
Reputation: 1110
You can do transformation on each ListBuffer
to collection of ListBuffer
let say List
of ListBuffer
then do flatMap
on RDD.
Below is the dummy POC.
val rdd = spark.sparkContext.parallelize(Seq(List(1,2,3,4), List(11,22,76,44)))
val flattenRdd = rdd.map(s => List(s.filter(_%2 == 1), s.filter(_%2 == 0)))
.flatMap(s => s)
flattenRdd.collect().foreach(s => println(s.mkString(",")))
1,3
2,4
11
22,76,44
Upvotes: 1