Moin
Moin

Reputation: 11

SCALA: Group on one column and Sum on another

I am new to Scala. The data contained in the input file is:

a,abcd,1
a,efaf,3
b,xafsdf,2
b,fafdr,3

I need output as follows:

a,4
b,5
object WordCount {
  def main(args: Array[String]): Unit = {`
  val conf = new SparkConf().setAppName("WordCount").setMaster("local")`
  val sc = new SparkContext(conf)`
  val dataRDD = sc.textFile("D:\\scala_samples\\moin.csv")`
  // dataRDD.collect().foreach(println(_))`
  val splitted = dataRDD.map(line => line.split(","))`
  val processed = splitted.map(x=>(x(0),x(2)))`

How do I proceed from here?

Upvotes: 0

Views: 809

Answers (3)

Balaji Reddy
Balaji Reddy

Reputation: 5710

This approach is with RDD's aggregateByKey operation.

val seqOp: (Int, (String, String, Int)) => Int = (accum, current) => accum + current._3
val combainOps: (Int, Int) => Int = (accum, current) => accum + current


val baseRDD = sc.textFile("\data.txt").map { x =>
  val splits = x.split(",")
  (splits(0), (splits(0), splits(1), splits(2).toInt))
}.aggregateByKey(0)(seqOp, combainOps)

baseRDD.foreach(println(_))

Note that map operation does splitting , parsing, converting record into key pair values instead of creating unnecessary RDDs for each of these steps.

To save RDD into a file,

baseRDD.saveAsTextFile("folder path")

For more info, refer document

UPDATE

Doing aggregation on multiple columns with Rdd is bit cumbersome whereas it is simple and elegant with sql. Updating my solution for sql. If equivalent solution is required on rdd, keep me posted..

import sql.implicits._
        Val df  = rdd.toDF("col1","col2","col3","col4","col5")

     import org. apache.spark.sql.functions.sum
        df.groupBy($"col1"). agg(sum("col3"),sum("col4"),sum("col5")).show

Upvotes: 0

Manoj Kumar Dhakad
Manoj Kumar Dhakad

Reputation: 1892

You have to use reduceByKey() function . Please find below step by step solution

//Creating sample rdd, you can use yours

val dataRDD=sc.parallelize(Array("a,abcd,1","a,efaf,3","b,xafsdf,2","b,fafdr,3"))

//Converting RDD[String] to RDD[(String,Int)] so you can apply reduceByKey function

val dataRDDMap=dataRDD.map(str=>str.split(",")).map(strArr=>(strArr(0),strArr(2).toInt))
val wcRDD=dataRDDMap.reduceByKey(_+_)

//Output:

scala>wcRDD.collect
res6: Array[(String, Int)] = Array((b,5), (a,4))  

Upvotes: 0

Assaf Mendelson
Assaf Mendelson

Reputation: 13001

Just use dataframe semantics.

First read the data as dataframe:

val spark = SparkSession.getOrCreate
val df = spark.read.csv(filename)
df.groupby("a").sum

EDIT: For the RDD version try:

processsed.reduceByKey(_ + _).

The above basically assumes processed is an RDD of pairs and you want to sum for each key (first element) the values of the second element.

Upvotes: 1

Related Questions