osk
osk

Reputation: 810

Spark transformation that withdraws objects from a ListBuffer in a RDD[ListBuffer] and creates new entries in the RDD

So I have an RDD which is RDD[ListBuffer[(Array[String], Long)]]. For simplicity we can just call it RDD[X], where X is a list of some variables. X is a list of obj objects, X[obj].

The idea is that I want a function which takes RDD[X] as input and outputs a new RDD[X], a transformation. This transformation will create new X lists by taking out obj from one X and creating a new one, and like "appending" it to the RDD.

I haven't found anything in Spark that supports this directly. Right now the only solution I can think of is by performing collect() and managing most of this at the driver but this is obviously not good. Any ideas?

Basically something like this:

val data = RDD[ListBuffer[(Array[String], Long)]]
// some transformation that calls some function
// what will happen is some (Array[String], Long) will be moved into an entirely new ListBuffer in outData while some may be completely removed
val outData = RDD[ListBuffer[(Array[String], Long)]]

Lets say if we have a starting RDD that contains one ListBuffer of 7 elements:

Element1 (in ListBuffer1)

Element2 (in ListBuffer1)

Element3 (in ListBuffer1)

Element4 (in ListBuffer1)

Element5 (in ListBuffer1)

Element6 (in ListBuffer1)

Element7 (in ListBuffer1)

And after the transformation the RDD will have the following contents:

Element1 (in ListBuffer1)

Element2 (in ListBuffer1)

Element4 (in ListBuffer2)

Element5 (in ListBuffer2)

Element6 (in ListBuffer2)

Some elements have been moved to a new ListBuffer in the RDD while two elements were completely removed.

I am using Spark 1.6.0.

Upvotes: 0

Views: 111

Answers (1)

Thang Nguyen
Thang Nguyen

Reputation: 1110

You can do transformation on each ListBuffer to collection of ListBuffer let say List of ListBuffer then do flatMap on RDD.

Below is the dummy POC.

val rdd = spark.sparkContext.parallelize(Seq(List(1,2,3,4), List(11,22,76,44)))
val flattenRdd = rdd.map(s => List(s.filter(_%2 == 1), s.filter(_%2 == 0)))
    .flatMap(s => s)
flattenRdd.collect().foreach(s => println(s.mkString(",")))

1,3
2,4
11
22,76,44

Upvotes: 1

Related Questions