Azeem Akhter
Azeem Akhter

Reputation: 577

map on multiple values of one key pyspark

I have an rdd with multiple values(list) against one key, I want to filter the garbage out from each value in a key.

rdd has this data

((key1, [('',val1),('', val2),..]),(key2,[...)

I want to map it to something like this

((key1,[val1, val2,...]), key2[...)

I know a map function is required here but I haven't used map for multiple values against a key.

this is my effort for doing this.

def mapper(x):
    values = []
    for a in x[1]:
        values.append(a[1])
    return(x[0], ap)
listRdd.map( mapper).collect()

but I get a few errors

Upvotes: 2

Views: 8384

Answers (1)

eliasah
eliasah

Reputation: 40380

The main idea is to consider each entry of an RDD as a single collection an process it as so. Meaning, if we consider the following entry

entry = ("key1", [('',"val1"),('',"val2")])

to process this collection into the expected output, we need to understand the structure of the collection

entry[0] 
# 'key1'

entry[1]
# [('', 'val1'), ('', 'val2')]

now let's work on this second part :

map(lambda x : x[1],entry[1])
# ['val1', 'val2']

We can now define a function that takes an entry as an input and the resulting output will be a (key,[values...]) tuple. We'll call it mapper. We can apply the mapper on every entry in the rdd.

Putting the code together :

def mapper(entry):
    return (entry[0],map(lambda x : x[1],entry[1]))

data = [("key1", [('',"val1"),('',"val2")]),("key2",[('',"val3"),('',"val2"),('',"val4")])]

rdd = sc.parallelize(data)

rdd2 = rdd.map(lambda x : mapper(x))

rdd2.collect()
# [('key1', ['val1', 'val2']), ('key2', ['val3', 'val2', 'val4'])]

Upvotes: 3

Related Questions