akrockz
akrockz

Reputation: 93

Calculate average using Spark Scala

How do I calculate the Average salary per location in Spark Scala with below two data sets ?

File1.csv(Column 4 is salary)

Ram, 30, Engineer, 40000  
Bala, 27, Doctor, 30000  
Hari, 33, Engineer, 50000  
Siva, 35, Doctor, 60000

File2.csv(Column 2 is location)

Hari, Bangalore  
Ram, Chennai  
Bala, Bangalore  
Siva, Chennai  

The above files are not sorted. Need to join these 2 files and find average salary per location. I tried with below code but unable to make it.

val salary = sc.textFile("File1.csv").map(e => e.split(","))  
val location = sc.textFile("File2.csv").map(e.split(","))  
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1)))  
val joinedData = joined.sortByKey()  
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2))  
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))    
aggregatedDF.repartition(1).saveAsTextFile("output.txt")  

Please help with code and sample output how it will look.

Many Thanks

Upvotes: 4

Views: 45069

Answers (4)

Leo C
Leo C

Reputation: 22449

You can read the CSV files as DataFrames, then join and group them to get the averages:

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
  "name", "age", "title", "salary"
)

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
  "name", "location"
)

import org.apache.spark.sql.functions._

val dfAverage = df1.join(df2, Seq("name")).
  groupBy(df2("location")).agg(avg(df1("salary")).as("average")).
  select("location", "average")

dfAverage.show
+-----------+-------+
|   location|average|
+-----------+-------+
|Bangalore  |40000.0|
|  Chennai  |50000.0|
+-----------+-------+

[UPDATE] For calculating average dimensions:

// file1.csv:
Ram,30,Engineer,40000,600*200
Bala,27,Doctor,30000,800*400
Hari,33,Engineer,50000,700*300
Siva,35,Doctor,60000,600*200

// file2.csv
Hari,Bangalore
Ram,Chennai
Bala,Bangalore
Siva,Chennai

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
  "name", "age", "title", "salary", "dimensions"
)

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
  "name", "location"
)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

val dfAverage = df1.join(df2, Seq("name")).
  groupBy(df2("location")).
  agg(
    avg(split(df1("dimensions"), ("\\*")).getItem(0).cast(IntegerType)).as("avg_length"),
    avg(split(df1("dimensions"), ("\\*")).getItem(1).cast(IntegerType)).as("avg_width")
  ).
  select(
    $"location", $"avg_length", $"avg_width",
    concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions")
  )

dfAverage.show
+---------+----------+---------+--------------+
| location|avg_length|avg_width|avg_dimensions|
+---------+----------+---------+--------------+
|Bangalore|     750.0|    350.0|   750.0*350.0|
|  Chennai|     600.0|    200.0|   600.0*200.0|
+---------+----------+---------+--------------+

Upvotes: 6

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

I would use dataframes: First read the dataframes such as:

val salary = spark.read.option("header", "true").csv("File1.csv")
val location = spark.read.option("header", "true").csv("File2.csv")

if you don't have headers you would need to set the option to "false" and use withColumnRenamed to change the default names.

val salary = spark.read.option("header", "false").csv("File1.csv").toDF("name", "age", "job", "salary")
val location = spark.read.option("header", "false").csv("File2.csv").toDF("name", "location")

now do the join:

val joined = salary.join(location, "name")

lastly do the average calculation:

val avg = joined.groupby("location").agg(avg($"salary"))

to save do:

avg.repartition(1).write.csv("output.csv")

Upvotes: 2

Raphael Roth
Raphael Roth

Reputation: 27373

I would use DataFrame API, this should work:

val salary = sc.textFile("File1.csv")
               .map(e => e.split(","))
               .map{case Seq(name,_,_,salary) => (name,salary)}
               .toDF("name","salary")

val location = sc.textFile("File2.csv")
                 .map(e => e.split(","))
                 .map{case Seq(name,location) => (name,location)}
                 .toDF("name","location")

import org.apache.spark.sql.functions._

salary
  .join(location,Seq("name"))
  .groupBy($"location")
  .agg(
    avg($"salary").as("avg_salary")
  )
  .repartition(1)
  .write.csv("output.csv")

Upvotes: 3

Harald Gliebe
Harald Gliebe

Reputation: 7564

You could do something like this:

val salary = sc.textFile("File1.csv").map(_.split(",").map(_.trim))
val location = sc.textFile("File2.csv").map(_.split(",").map(_.trim))
val joined = salary.map(e=>(e(0),e(3).toInt)).join(location.map(e=>(e(0),e(1))))
val locSalary = joined.map(v => (v._2._2, v._2._1))
val averages = locSalary.aggregateByKey((0,0))((t,e) => (t._1 + 1, t._2 + e),
        (t1,t2) => (t1._1 + t2._1, t1._2 + t2._2)).mapValues(t => t._2/t._1)

then averages.take(10) will give:

res5: Array[(String, Int)] = Array((Chennai,50000), (Bangalore,40000))

Upvotes: 0

Related Questions