Vinay Kumar
Vinay Kumar

Reputation: 1684

Adding an index to an RDD using a shared mutable state

Take this simple RDD as an example for explaining the problem:

val testRDD=sc.parallelize(List((1, 2), (3, 4), (3, 6)))

I have this function to help me implement the indexing:

 var sum = 0; 

 def inc(l: Int): Int = {
    sum += l
    sum 
 }

Now I want to create the id for each tuple:

val indexedRDD= testRDD.map(x=>(x._1,x._2,inc(1)));

The output RDD should be ((1,2,1), (3,4,2), (3,6,3))

But it turned out that all the values are same. It is taking 1 for all the tuples:

((1,2,1), (3,4,1), (3,6,1))

Where am I going wrong? Is there any other way to achieve the same.

Upvotes: 3

Views: 1025

Answers (1)

Justin Pihony
Justin Pihony

Reputation: 67115

You are looking for:

def zipWithIndex(): RDD[(T, Long)]

However, note from the docs:

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

Upvotes: 3

Related Questions