Vinny
Vinny

Reputation: 865

Aggregate List of Map in PySpark

I have a list of map e.g

[{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20} } 

I want to get the average of values of a and b. So the expected output is

a = (10 + 5 + 0 + 0) /3 = 5 ;
b = 80/4 = 20.

How can i do it efficiently using RDD

Upvotes: 0

Views: 2875

Answers (3)

johnmdonich
johnmdonich

Reputation: 349

Given the structure of your data you should be able to use the dataframe api to achieve this calculation. If you need an rdd it is not to hard to get from the dataframe back to an rdd.

from pyspark.sql import functions as F
df = spark.createDataFrame([{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20}])

Dataframe looks like this

+----+---+
|   a|  b|
+----+---+
|  10| 20|
|   5| 20|
|null| 20|
|   0| 20|
+----+---+

Then it follows simply to calculate averages using the pyspark.sql functions

cols = df.columns
df_means = df.agg(*[F.mean(F.col(col)).alias(col+"_mean") for col in cols])
df_means.show()

OUTPUT:

+------+------+
|a_mean|b_mean|
+------+------+
|   5.0|  20.0|
+------+------+

Upvotes: 1

Anil_M
Anil_M

Reputation: 11453

You can use defaultdict to collect similar keys and their values as list. Then simply aggregate using sum of values divided by number of elements of list for each value.

from collections import defaultdict

x = [{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20}  ,{'a' : 0,'b': 20}]
y = defaultdict(lambda: [])
[y[k].append(v) for i in x for k,v in i.items() ]

for k,v in y.items():
    print k, "=" ,sum(v)/len(v)

>>> y
defaultdict(<function <lambda> at 0x02A43BB0>, {'a': [10, 5, 0], 'b': [20, 20, 20, 20]})
>>> 

>>> 
a = 5
b = 20

Upvotes: 0

akuiper
akuiper

Reputation: 214987

The easiest might be map your rdd element to a format like:

init = {'a': {'sum': 0, 'cnt': 0}, 'b': {'sum': 0, 'cnt': 0}}

i.e. record the sum and count for each key, and then reduce it.

Map function:

def map_fun(d, keys=['a', 'b']):
    map_d = {}
    for k in keys:
        if k in d:
            temp = {'sum': d[k], 'cnt': 1}
        else:
            temp = {'sum': 0, 'cnt': 0}
        map_d[k] = temp
    return map_d

Reduce function:

def reduce_fun(a, b, keys=['a', 'b']):
    from collections import defaultdict
    reduce_d = defaultdict(dict)
    for k in keys:
        reduce_d[k]['sum'] = a[k]['sum'] + b[k]['sum']
        reduce_d[k]['cnt'] = a[k]['cnt'] + b[k]['cnt']
    return reduce_d

rdd.map(map_fun).reduce(reduce_fun)
# defaultdict(<type 'dict'>, {'a': {'sum': 15, 'cnt': 3}, 'b': {'sum': 80, 'cnt': 4}})

Calculate the average:

d = rdd.map(map_fun).reduce(reduce_fun)
{k: v['sum']/v['cnt'] for k, v in d.items()}
{'a': 5, 'b': 20}

Upvotes: 1

Related Questions