Reputation: 2621
I'm trying to learn Spark in Python, and am stuck with combineByKey
for averaging the values in key-value pairs. In fact, my confusion is not with the combineByKey
syntax, but what comes afterward. The typical example (from the O'Rielly 2015 Learning Spark Book) can be seen on the web in many places; here's one.
The problem is with the sumCount.map(lambda (key, (totalSum, count)): (key, totalSum / count)).collectAsMap()
statement. Using spark 2.0.1 and iPython 3.5.2, this throws a syntax error exception. Simplifying it to something that should work (and is what's in the O'Reilly book): sumCount.map(lambda key,vals: (key, vals[0]/vals[1])).collectAsMap()
causes Spark to go bats**t crazy with java exceptions, but I do note a TypeError: <lambda>() missing 1 required positional argument: 'v'
error.
Can anyone point me to an example of this functionality that actually works with a recent version of Spark & Python? For completeness, I've included my own minimum working (or rather, non-working) example:
In: pRDD = sc.parallelize([("s",5),("g",3),("g",10),("c",2),("s",10),("s",3),("g",-1),("c",20),("c",2)])
In: cbk = pRDD.combineByKey(lambda x:(x,1), lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]))
In: cbk.collect()
Out: [('s', (18, 3)), ('g', (12, 3)), ('c', (24, 3))]
In: cbk.map(lambda key,val:(k,val[0]/val[1])).collectAsMap() <-- errors
It's easy enough to compute [(e[0],e[1][0]/e[1][1]) for e in cbk.collect()]
, but I'd rather get the "Sparkic" way working.
Upvotes: 1
Views: 1073
Reputation: 691
Averaging over a specific column value can be done by using the Window concept. Consider the following code:
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([('a', 2), ('b', 3), ('a', 6), ('b', 5)],
['a', 'i'])
win = Window.partitionBy('a')
df.withColumn('avg', F.avg('i').over(win)).show()
Would yield:
+---+---+---+
| a| i|avg|
+---+---+---+
| b| 3|4.0|
| b| 5|4.0|
| a| 2|4.0|
| a| 6|4.0|
+---+---+---+
The average aggregation is done on each worker separately, requires no round trip to the host, and therefore efficient.
Upvotes: 0
Reputation: 330063
Step by step:
lambda (key, (totalSum, count)): ...
is so-called Tuple Parameter Unpacking which has been removed in Python. RDD.map
takes a function which expect as single argument. Function you try to use:
lambda key, vals: ...
Is a function which expects two arguments, not a one. A valid translation from 2.x syntax would be
lambda key_vals: (key_vals[0], key_vals[1][0] / key_vals[1][1])
or:
def get_mean(key_vals):
key, (total, cnt) = key_vals
return key, total / cnt
cbk.map(get_mean)
You can also make this much simpler with mapValues
:
cbk.mapValues(lambda x: x[0] / x[1])
Finally a numerically stable solution would be:
from pyspark.statcounter import StatCounter
(pRDD
.combineByKey(
lambda x: StatCounter([x]),
StatCounter.merge,
StatCounter.mergeStats)
.mapValues(StatCounter.mean))
Upvotes: 2