abbassix
abbassix

Reputation: 645

running Map on a nested RDD

I have a huge RDD with hundreds of millions of records that its structure is like this: rdd.collect() results in:

[['t1', 't5', 't7'],
['t1', 't7'], ['t2', 't3'],
['t1', 't3', 't4'], ...]

And suppose this is all of the RDD and not a snapshot of it. What I want to have is this: desired.collect() results in:

('t1', <pyspark.resultiterable.ResultIterable object at 0x7f534107ead0>)
('t5', <pyspark.resultiterable.ResultIterable object at 0x7f534107e090>)
('t7', <pyspark.resultiterable.ResultIterable object at 0x7f534107e050>)
('t2', <pyspark.resultiterable.ResultIterable object at 0x7f534107efd0>)
('t3', <pyspark.resultiterable.ResultIterable object at 0x7f534107e510>)
('t4', <pyspark.resultiterable.ResultIterable object at 0x7f534107ed50>)

Where for example:

for x in desired.collect()[0][1]:
  print(x)

results in:

t5
t7
t3
t4

because they share a list with t1.

Upvotes: 0

Views: 74

Answers (1)

Anjaneya Tripathi
Anjaneya Tripathi

Reputation: 1459

Continuing from your last question we can use same method along with groupbykey and mapvalues

import itertools
data2=[['t1', 't5', 't7'],['t1', 't7'], ['t2', 't3'],['t1', 't3', 't4']]
rd2=sc.parallelize(data2)
rd2=rd2.flatMap(lambda x:itertools.combinations(x,2)).groupByKey().mapValues(set)

for x in rd2.collect()[0][1]:
  print(x)

#output
t5
t7
t3
t4

Edit - Have added .mapValues(set) to remove duplicates from results

Upvotes: 1

Related Questions