Reputation: 4991
Assuming that I have the following RDD:
test1 = (('trial1',[1,2]),('trial2',[3,4]))
test1RDD = sc.parallelize(test1)
How can I create the following rdd:
((1,'trial1',[1,2]),(2,'trial2',[3,4]))
I tried with accumulators but it doesnt work as accumulators cannot be accessed in tasks:
def increm(keyvalue):
global acc
acc +=1
return (acc.value,keyvalue[0],keyvalue[1])
acc = sc.accumulator(0)
test1RDD.map(lambda x: increm(x)).collect()
Any idea how can this be done?
Upvotes: 1
Views: 2867
Reputation: 49410
You can use zipWithIndex
zipWithIndex()
Zips this RDD with its element indices.
The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.
This method needs to trigger a spark job when this RDD contains more than one partitions.
>>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
and use map
to transform the RDD to have the index in front of the new RDD
This is untested as I dont have any environment:
test1 = (('trial1',[1,2]),('trial2',[3,4]))
test1RDD = sc.parallelize(test1)
test1RDD.zipWithIndex().map(lambda x : (x[1],x[0]))
Upvotes: 8