Reputation: 865
I have a list of map e.g
[{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20} }
I want to get the average of values of a and b. So the expected output is
a = (10 + 5 + 0 + 0) /3 = 5 ;
b = 80/4 = 20.
How can i do it efficiently using RDD
Upvotes: 0
Views: 2875
Reputation: 349
Given the structure of your data you should be able to use the dataframe api to achieve this calculation. If you need an rdd it is not to hard to get from the dataframe back to an rdd.
from pyspark.sql import functions as F
df = spark.createDataFrame([{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20}])
Dataframe looks like this
+----+---+
| a| b|
+----+---+
| 10| 20|
| 5| 20|
|null| 20|
| 0| 20|
+----+---+
Then it follows simply to calculate averages using the pyspark.sql functions
cols = df.columns
df_means = df.agg(*[F.mean(F.col(col)).alias(col+"_mean") for col in cols])
df_means.show()
OUTPUT:
+------+------+
|a_mean|b_mean|
+------+------+
| 5.0| 20.0|
+------+------+
Upvotes: 1
Reputation: 11453
You can use defaultdict
to collect similar keys and their values as list
.
Then simply aggregate using sum of values divided by number of elements of list
for each value.
from collections import defaultdict
x = [{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20}]
y = defaultdict(lambda: [])
[y[k].append(v) for i in x for k,v in i.items() ]
for k,v in y.items():
print k, "=" ,sum(v)/len(v)
>>> y
defaultdict(<function <lambda> at 0x02A43BB0>, {'a': [10, 5, 0], 'b': [20, 20, 20, 20]})
>>>
>>>
a = 5
b = 20
Upvotes: 0
Reputation: 214987
The easiest might be map
your rdd element to a format like:
init = {'a': {'sum': 0, 'cnt': 0}, 'b': {'sum': 0, 'cnt': 0}}
i.e. record the sum and count for each key, and then reduce it.
Map function:
def map_fun(d, keys=['a', 'b']):
map_d = {}
for k in keys:
if k in d:
temp = {'sum': d[k], 'cnt': 1}
else:
temp = {'sum': 0, 'cnt': 0}
map_d[k] = temp
return map_d
Reduce function:
def reduce_fun(a, b, keys=['a', 'b']):
from collections import defaultdict
reduce_d = defaultdict(dict)
for k in keys:
reduce_d[k]['sum'] = a[k]['sum'] + b[k]['sum']
reduce_d[k]['cnt'] = a[k]['cnt'] + b[k]['cnt']
return reduce_d
rdd.map(map_fun).reduce(reduce_fun)
# defaultdict(<type 'dict'>, {'a': {'sum': 15, 'cnt': 3}, 'b': {'sum': 80, 'cnt': 4}})
Calculate the average:
d = rdd.map(map_fun).reduce(reduce_fun)
{k: v['sum']/v['cnt'] for k, v in d.items()}
{'a': 5, 'b': 20}
Upvotes: 1