Thom Rogers
Thom Rogers

Reputation: 1433

How do I average one field while grouping by another in using RDDs with pyspark?

I am getting wrapped around the axle between groupBy, aggregate, reduceByKey, map, etc. My goal is to average out field 16 (the last field) for each unique value of field 2.

So the output might look something like:

NW  -8
DL  -6
OO  -1

Given an RDD with the following elements:

[u'2002-04-28,NW,19386,DTW,MI,42.21,-83.35,MSP,MN,44.88,-93.22,1220,1252,32,1316,1350,34', u'2012-05-04,OO,20304,LSE,WI,43.87,-91.25,MSP,MN,44.88,-93.22,1130,1126,-4,1220,1219,-1', u'2002-08-18,NW,19386,BDL,CT,41.93,-72.68,MSP,MN,44.88,-93.22,805,804,-1,959,952,-7', u'2004-07-29,NW,19386,BDL,CT,41.93,-72.68,MSP,MN,44.88,-93.22,800,757,-3,951,933,-18', u'2008-07-21,NW,19386,IND,IN,39.71,-86.29,MSP,MN,44.88,-93.22,1143,1140,-3,1228,1222,-6', u'2007-10-29,NW,19386,RST,MN,43.9,-92.5,MSP,MN,44.88,-93.22,1546,1533,-13,1639,1609,-30', u'2012-12-24,DL,19790,BOS,MA,42.36,-71,MSP,MN,44.88,-93.22,1427,1431,4,1648,1635,-13', u'2010-04-22,DL,19790,DTW,MI,42.21,-83.35,MSP,MN,44.88,-93.22,930,927,-3,1028,1008,-20', u'2010-06-01,DL,19790,DTW,MI,42.21,-83.35,MSP,MN,44.88,93.22,835,846,11,930,946,16', u'2003-09-04,NW,19386,BUF,NY,42.94,-78.73,MSP,MN,44.88,-93.22,900,852,-8,1017,955,-22']

Upvotes: 0

Views: 388

Answers (2)

eliasah
eliasah

Reputation: 40370

Here is a solution :

data = [u'2002-04-28,NW,19386,DTW,MI,42.21,-83.35,MSP,MN,44.88,-93.22,1220,1252,32,1316,1350,34', u'2012-05-04,OO,20304,LSE,WI,43.87,-91.25,MSP,MN,44.88,-93.22,1130,1126,-4,1220,1219,-1', u'2002-08-18,NW,19386,BDL,CT,41.93,-72.68,MSP,MN,44.88,-93.22,805,804,-1,959,952,-7', u'2004-07-29,NW,19386,BDL,CT,41.93,-72.68,MSP,MN,44.88,-93.22,800,757,-3,951,933,-18', u'2008-07-21,NW,19386,IND,IN,39.71,-86.29,MSP,MN,44.88,-93.22,1143,1140,-3,1228,1222,-6', u'2007-10-29,NW,19386,RST,MN,43.9,-92.5,MSP,MN,44.88,-93.22,1546,1533,-13,1639,1609,-30', u'2012-12-24,DL,19790,BOS,MA,42.36,-71,MSP,MN,44.88,-93.22,1427,1431,4,1648,1635,-13', u'2010-04-22,DL,19790,DTW,MI,42.21,-83.35,MSP,MN,44.88,-93.22,930,927,-3,1028,1008,-20', u'2010-06-01,DL,19790,DTW,MI,42.21,-83.35,MSP,MN,44.88,93.22,835,846,11,930,946,16', u'2003-09-04,NW,19386,BUF,NY,42.94,-78.73,MSP,MN,44.88,-93.22,900,852,-8,1017,955,-22']
current_rdd = sc.parallelize(data)
rdd = current_rdd.map(lambda x : (x.split(","))).map(lambda x : (x[1],x[-1])) \
                 .groupByKey() \ # group by key
                 .map(lambda x : (x[0], map(int, list(x[1])))) \ # convert resultiterable to list
                 .map(lambda x : (x[0], float(sum(x[1]))/len(x[1]))) # compute average on list for each key
# output
rdd.take(10)
# [(u'DL', -5.666666666666667), (u'NW', -8.166666666666666), (u'OO', -1.0)]

Upvotes: 2

ccheneson
ccheneson

Reputation: 49410

Ok this is a shot in the dark as I don't have any environment to try this on (and it sucks)

I assume you have a RDD in data already splitted

mappedData  = data.map(lambda d : (d[1], d[-1])).cache      // (NW,34), (OO,-1), (NW,-7)
groupedData = mappedData.groupByKey().mapValues(len)        //  (NW, (34, -7)) ->  (NW, 2)
sumData = mappedData.groupByKey().mapValues(sum)            //  (NW, (34, -7)) ->  (NW, 27)
sumData.join(groupedData).map(lambda (x,y) => (x, y[0] / y[1]  )) (NW, (27,2)) -> (NW, 27/2)

Upvotes: 2

Related Questions