Reputation: 577
I have an rdd with multiple values(list) against one key, I want to filter the garbage out from each value in a key.
rdd has this data
((key1, [('',val1),('', val2),..]),(key2,[...)
I want to map it to something like this
((key1,[val1, val2,...]), key2[...)
I know a map function is required here but I haven't used map for multiple values against a key.
this is my effort for doing this.
def mapper(x):
values = []
for a in x[1]:
values.append(a[1])
return(x[0], ap)
listRdd.map( mapper).collect()
but I get a few errors
Upvotes: 2
Views: 8384
Reputation: 40380
The main idea is to consider each entry of an RDD
as a single collection an process it as so. Meaning, if we consider the following entry
entry = ("key1", [('',"val1"),('',"val2")])
to process this collection into the expected output, we need to understand the structure of the collection
entry[0]
# 'key1'
entry[1]
# [('', 'val1'), ('', 'val2')]
now let's work on this second part :
map(lambda x : x[1],entry[1])
# ['val1', 'val2']
We can now define a function that takes an entry as an input and the resulting output will be a (key,[values...]) tuple. We'll call it mapper
. We can apply the mapper on every entry in the rdd.
Putting the code together :
def mapper(entry):
return (entry[0],map(lambda x : x[1],entry[1]))
data = [("key1", [('',"val1"),('',"val2")]),("key2",[('',"val3"),('',"val2"),('',"val4")])]
rdd = sc.parallelize(data)
rdd2 = rdd.map(lambda x : mapper(x))
rdd2.collect()
# [('key1', ['val1', 'val2']), ('key2', ['val3', 'val2', 'val4'])]
Upvotes: 3