mrsquid
mrsquid

Reputation: 635

Pyspark RDD aggregate different value fields differently

This is a pretty open ended question, but I have an RDD in this format.

[('2014-06', ('131313', 5.5, 6.5, 7.5, 10.5 )),
('2014-07', ('246655', 636636.53, .53252, 5252.112, 5242.23)),
('2014-06', ('131232', 1, 2, 4.5, 5.5)),
('2014-07', ('131322464', 536363.6363, 536336.6363, 3563.63636, 9.6464464646464646))]

I want to group by and aggregate each of the values differently by the key. For example for the key '2014-06' I want to get the count of the first value field i.e '131313' and the average for the other fields 5.5, 6.5, 7.5, 10.5 for the key '2014-06'.

So the result for the above simple example for key '2014-06' would be ('2014-06', (2, 3.25, 5.5, 8)).

What would be the best method to do this for an RDD? I cannot use any Spark SQL expressions or functions only RDD functions.

I was thinking about doing something with mapValues and using some other function but I am having some trouble formulating this function.

I know this questions is pretty open ended so please let me know if you have any more questions.

Thank you for your time.

Upvotes: 3

Views: 1279

Answers (2)

blackbishop
blackbishop

Reputation: 32650

@jxc solution does what you need but here is another way of doing it.

You can use aggregateByKey. This function takes two functions seqFunc, combFunc and an accumulator value called neutral zero value.

zero_value = (0, 0, 0, 0, 0)
d = rdd.aggregateByKey(zero_value, lambda x, y: (1, *y[1:]),
                       lambda x, y: tuple(map(add, x, y))
                       ) \
    .mapValues(lambda v: (v[0], *[i / v[0] for i in v[1:]])) \

First lambda expression converts each value by replacing the first string field by integer 1 (counting for one occurrence). The second lambda expression merges two values by adding two lists.

After this aggregation, we just need to divide elements of each value list by the first element which gives the average.

Output:

[('2014-06', (2, 3.25, 4.25, 6.0, 8.0)), ('2014-07', (2, 586500.0831500001, 268168.58441, 4407.87418, 2625.938223232323))]

Upvotes: 1

jxc
jxc

Reputation: 13998

One way is to use map() method to convert the first value to 1 (for record counting) and then use reduceByKey() to sum each value with the same key. finally, use mapValues() to calculate the mean values except the first one which is the count(keep as-is).

rdd.map(lambda x: (x[0], (1, *x[1][1:]))) \
   .reduceByKey(lambda x,y: tuple([x[i]+y[i] for i in range(len(x))])) \
   .mapValues(lambda x: (x[0], *[ e/x[0] for e in x[1:]])) 

After map():

[('2014-06', (1, 5.5, 6.5, 7.5, 10.5)),
 ('2014-07', (1, 636636.53, 0.53252, 5252.112, 5242.23)),
 ('2014-06', (1, 1, 2, 4.5, 5.5)),
 ('2014-07', (1, 536363.6363, 536336.6363, 3563.63636, 9.646446464646464))]

After reduceByKey():

[('2014-06', (2, 6.5, 8.5, 12.0, 16.0)),
 ('2014-07',
  (2, 1173000.1663000002, 536337.16882, 8815.74836, 5251.876446464646))]

After mapValues():

[('2014-06', (2, 3.25, 4.25, 6.0, 8.0)),
 ('2014-07',
  (2, 586500.0831500001, 268168.58441, 4407.87418, 2625.938223232323))]

Upvotes: 1

Related Questions