SN.JS
SN.JS

Reputation: 3

How to generate a new RDD from another RDD according to specific logic

I am a freshman to Spark. I have a problem, but I don't know how to solve it. My data in RDD is as follows:

(1,{A,B,C,D})
(2,{E,F,G})
......

I know RDDs are immutable, but, I want to transform my RDD into a new RDD that looks like this:

11 A,B
12 B,C
13 C,D
21 E,F
22 F,G
......

How can I generate a new key and extract adjacent elements?

Upvotes: 0

Views: 1694

Answers (1)

Rohan Aletty
Rohan Aletty

Reputation: 2442

Assuming your collection is something similar to a List, you could do something like:

val rdd2 = rdd1.flatMap { case (key, values) => 
  for (value <- values.sliding(2).zipWithIndex) 
    yield (key.toString + value._2, value._1) 
}

What we are doing here is iterating through the values in your list, applying a sliding window of size 2 on the elements, zipping the elements with an integer index, and finally outputting a list of tuples keyed by the original index appended with the list indices (whose values are the slid elements). We also use a flatMap here in order to flatten the results into their own records.

When run in spark-shell, I'm seeing the following output on your example:

scala> val rdd1 = sc.parallelize(Array((1,List("A","B","C","D")), (2,List("E","F","G"))))
rdd1: org.apache.spark.rdd.RDD[(Int, List[String])] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> val rdd2 = rdd1.flatMap { case (key, values) => for (value <- values.sliding(2).zipWithIndex) yield (key.toString + value._2, value._1) }
rdd2: org.apache.spark.rdd.RDD[(String, Seq[String])] = MapPartitionsRDD[1] at flatMap at <console>:23

scala> rdd2.foreach(println)
...
(10,List(A, B))
(11,List(B, C))
(12,List(C, D))
(20,List(E, F))
(21,List(F, G))

The one note with this is that the output key (e.g. 10, 11) will have 3 digits if you have 11 or more elements. For example, for the input key 1, you will have an output key 110 on the 11th element. Not sure if that fits your use case, but it seemed like a reasonable extension of your request. Based off your output key scheme, I would actually suggest something different (like maybe adding a hyphen between the key and element?). This will prevent collisions later as you'll see 2-10 and 21-0 instead of 210 for both keys.

Upvotes: 1

Related Questions