Prasanna GR
Prasanna GR

Reputation: 13

Remove duplicate's from Spark RDD

I have duplicated records in my file collected as a list of dictionaries. Here is my sampleRDD variable content, which is a pyspark.rdd.RDD object:

[{"A": 111, "B": 222, "C": 333}
,{"A": 111, "B": 222, "C": 333}]

I would like to get only one record as following:

[{"A": 111, "B": 222, "C": 333}]

Upvotes: 1

Views: 3773

Answers (2)

Prasanna GR
Prasanna GR

Reputation: 13

Found a solution, not sure if its efficient though ;)

Define function to remove duplicates:

def remove_repeat(dupRDD):
    return([dict(tup) for tup in set(tuple(item.items()) for item in dupRDD)])

# Assuming 'AAA' is main key (can pick any)
sampleRDD_2 = sampleRDD.map(lambda snap: (snap['AAA'], snap)).groupByKey()

this creates a RDD as [(111, <pyspark.resultiterable.ResultIterable object>)]

The pyspark object values can be fetched by list, passed to remove_repeat function

sampleRDD_2.map(lambda x : (x, remove_repeat(list(x[1])))).collect()

Returns deduped list of dict at the key level: [(111,[{"A": 111, "B": 222, "C": 333}])]

Upvotes: 0

user3689574
user3689574

Reputation: 1676

There is a problem doing Pyspark distinct on a list of dictionaries. This is a way around it:

temp = sc.parallelize([{"A": 111, "B": 222, "C": 333}
,{"A": 111, "B": 222, "C": 333}])

print temp.map(lambda x: tuple(x.iteritems())).distinct().collect()
    >>[(('A', 111), ('C', 333), ('B', 222))]

Or if you need it back in dictionary form:

print temp.map(lambda x: tuple(x.iteritems())).distinct().map(lambda x: dict(x)).collect()
    >>[{'A': 111, 'C': 333, 'B': 222}]

Upvotes: 1

Related Questions