Dr. Andrew
Dr. Andrew

Reputation: 2621

Python Spark combineByKey Average

I'm trying to learn Spark in Python, and am stuck with combineByKey for averaging the values in key-value pairs. In fact, my confusion is not with the combineByKey syntax, but what comes afterward. The typical example (from the O'Rielly 2015 Learning Spark Book) can be seen on the web in many places; here's one.

The problem is with the sumCount.map(lambda (key, (totalSum, count)): (key, totalSum / count)).collectAsMap() statement. Using spark 2.0.1 and iPython 3.5.2, this throws a syntax error exception. Simplifying it to something that should work (and is what's in the O'Reilly book): sumCount.map(lambda key,vals: (key, vals[0]/vals[1])).collectAsMap() causes Spark to go bats**t crazy with java exceptions, but I do note a TypeError: <lambda>() missing 1 required positional argument: 'v' error.

Can anyone point me to an example of this functionality that actually works with a recent version of Spark & Python? For completeness, I've included my own minimum working (or rather, non-working) example:

In: pRDD = sc.parallelize([("s",5),("g",3),("g",10),("c",2),("s",10),("s",3),("g",-1),("c",20),("c",2)])
In: cbk = pRDD.combineByKey(lambda x:(x,1), lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]))
In: cbk.collect()
Out: [('s', (18, 3)), ('g', (12, 3)), ('c', (24, 3))]
In: cbk.map(lambda key,val:(k,val[0]/val[1])).collectAsMap() <-- errors

It's easy enough to compute [(e[0],e[1][0]/e[1][1]) for e in cbk.collect()], but I'd rather get the "Sparkic" way working.

Upvotes: 1

Views: 1073

Answers (2)

Elior Malul
Elior Malul

Reputation: 691

Averaging over a specific column value can be done by using the Window concept. Consider the following code:

import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([('a', 2), ('b', 3), ('a', 6), ('b', 5)],
                           ['a', 'i'])
win = Window.partitionBy('a')
df.withColumn('avg', F.avg('i').over(win)).show()

Would yield:

+---+---+---+
|  a|  i|avg|
+---+---+---+
|  b|  3|4.0|
|  b|  5|4.0|
|  a|  2|4.0|
|  a|  6|4.0|
+---+---+---+

The average aggregation is done on each worker separately, requires no round trip to the host, and therefore efficient.

Upvotes: 0

zero323
zero323

Reputation: 330063

Step by step:

  • lambda (key, (totalSum, count)): ... is so-called Tuple Parameter Unpacking which has been removed in Python.
  • RDD.map takes a function which expect as single argument. Function you try to use:

    lambda key, vals: ...
    

    Is a function which expects two arguments, not a one. A valid translation from 2.x syntax would be

    lambda key_vals: (key_vals[0], key_vals[1][0] / key_vals[1][1])
    

    or:

    def get_mean(key_vals):
        key, (total, cnt) = key_vals
        return key, total / cnt
    
    cbk.map(get_mean)
    

    You can also make this much simpler with mapValues:

    cbk.mapValues(lambda x: x[0] / x[1])
    
  • Finally a numerically stable solution would be:

    from pyspark.statcounter import StatCounter
    
    (pRDD
        .combineByKey(
            lambda x: StatCounter([x]),
            StatCounter.merge,
            StatCounter.mergeStats)
        .mapValues(StatCounter.mean))
    

Upvotes: 2

Related Questions