Willard
Willard

Reputation: 522

How to use Pyspark to calculate average on RDD

Given the following code, I'm trying to calculate average of the floating point column on a per month basis.

rdd = sc.parallelize(
 [['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0],
 ['FEB', 'PA', 1.0],
 ['FEB', 'NJ', 1.0],
 ['FEB', 'NY', 2.0],
 ['FEB', 'VT', 1.0],
 ['MAR', 'NJ', 2.0],
 ['MAR', 'NY', 1.0],
 ['MAR', 'VT', 2.0],
 ['MAR', 'PA', 3.0]])

def avg_map(row):
    return (row[0], (row[2], 1))

def avg_reduce_func(value1, value2):
    return (value1[0], (value1[1][0] + value2[1][0], value1[1][1] + value2[1][1]))

dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).collect()

From a high level point of view, I was trying to first use map to create an RDD of the following form:

[('JAN', (3.0, 1)),
 ('JAN', (1.0, 1)),
 ('JAN', (2.0, 1)),
 ('JAN', (4.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (2.0, 1)),
 ('FEB', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (3.0, 1))]

Then, I wanted to use the reduceByKey function to add up the ones and the floats by key creating a new RDD which contains one row per month with a tuple representing the total of the floats and an integer indicating the number of rows. For example, the Jan row would look like this:

('Jan', (10.0, 4))

However, I don't seem to be able to index into the tuple correctly and end up with a runtime error in the reduceByKey function.

Question 1: Why can't I index into the tuple in the avg_reduce_func? Question 2: How can this code be rewritten to calculate the average of the floating point column on a per month basis?

Upvotes: 3

Views: 7245

Answers (2)

Yosi Hammer
Yosi Hammer

Reputation: 588

Is there a particular reason for you to use RDDs?

This is straight forward to do with dataframes, and will be more efficient:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count

d =  [['JAN', 'NY', 3.0],
 ['JAN', 'PA', 1.0],
 ['JAN', 'NJ', 2.0],
 ['JAN', 'CT', 4.0],
 ['FEB', 'PA', 1.0],
 ['FEB', 'NJ', 1.0],
 ['FEB', 'NY', 2.0],
 ['FEB', 'VT', 1.0],
 ['MAR', 'NJ', 2.0],
 ['MAR', 'NY', 1.0],
 ['MAR', 'VT', 2.0],
 ['MAR', 'PA', 3.0]] 

 spark = SparkSession.builder.getOrCreate()

 df = spark.createDataFrame(d).selectExpr(
     "_1 as month", "_2 as state", "_3 as float_col")
 df.show()

 '''
 +-----+-----+---------+
 |month|state|float_col|
 +-----+-----+---------+
 |  JAN|   NY|      3.0|
 |  JAN|   PA|      1.0|
 |  JAN|   NJ|      2.0|
 |  JAN|   CT|      4.0|
 |  FEB|   PA|      1.0|
 |  FEB|   NJ|      1.0|
 |  FEB|   NY|      2.0|
 |  FEB|   VT|      1.0|
 |  MAR|   NJ|      2.0|
 |  MAR|   NY|      1.0|
 |  MAR|   VT|      2.0|
 |  MAR|   PA|      3.0|
 +-----+-----+---------+
 '''

 agg_df = df.groupBy("month").agg(
     sum('float_col').alias('float_sum'),
     count('month').alias('month_count')
 )
 agg_df.show()

 '''
 +-----+---------+-----------+
 |month|float_sum|month_count|
 +-----+---------+-----------+
 |  FEB|      5.0|          4|
 |  JAN|     10.0|          4|
 |  MAR|      8.0|          4|
 +-----+---------+-----------+
 '''

Upvotes: 0

Willard
Willard

Reputation: 522

I figured it out, I was trying to access the key inside the avg_reduce_func when only the values are passed in. I ended up with the following:

def avg_map_func(row):
    return (row[0], (row[2], 1))

def avg_reduce_func(value1, value2):
    return ((value1[0] + value2[0], value1[1] + value2[1])) 

dataset_rdd.map(avg_map_func).reduceByKey(avg_reduce_func).mapValues(lambda x: x[0]/x[1]).collect()

Upvotes: 3

Related Questions