balaji
balaji

Reputation: 11

How to get Running sum of based on two columns using Spark scala RDD

I have data in RDD which have 4 columns like geog, product, time and price. I want to calculate the running sum based on geog and time.

Given Data

Source

I need result like.

[Need Output like this]

I need this spark-Scala-RDD. I am new to this Scala world, i can achieve this easily in SQL. i want do this in spark -Scala -RDD like using (map,flatmap).

Advance thanks for your help.

Upvotes: 0

Views: 1128

Answers (2)

balaji
balaji

Reputation: 11

I think this will help others also. I tried in SCALA RDD.

    val fileName_test_1 ="C:\\venkat_workshop\\Qintel\\Data_Files\\test_1.txt"


     val rdd1 = sc.textFile(fileName_test_1).map { x => (x.split(",")(0).toString() , 
                                                          x.split(",")(1).toString(),
                                                          x.split(",")(2).toString(),
                                                          x.split(",")(3).toDouble
                                                          ) 
                                                  }.groupBy( x => (x._1,x._3) )
                                                   .mapValues
                                                             { 
                                                               _.toList.sortWith
                                                               {
                                                               (a,b) => (a._4) > (b._4)
                                                               }.scanLeft("","","",0.0,0.0){
                                                                 (a,b) => (b._1,b._2,b._3,b._4,b._4+a._5)
                                                               }.tail
                                                             }.flatMapValues(f => f).values

Upvotes: 1

Fokko Driesprong
Fokko Driesprong

Reputation: 2250

This is possible by defining a window function:

>>> val data = List(
    ("India","A1","Q1",40),
    ("India","A2","Q1",30),
    ("India","A3","Q1",21),
    ("German","A1","Q1",50),
    ("German","A3","Q1",60),
    ("US","A1","Q1",60),
    ("US","A2","Q2",25),
    ("US","A4","Q1",20),
    ("US","A5","Q5",15),
    ("US","A3","Q3",10)
)

>>> val df = sc.parallelize(data).toDF("country", "part", "quarter", "result")
>>> df.show()

+-------+----+-------+------+
|country|part|quarter|result|
+-------+----+-------+------+
|  India|  A1|     Q1|    40|
|  India|  A2|     Q1|    30|
|  India|  A3|     Q1|    21|
| German|  A1|     Q1|    50|
| German|  A3|     Q1|    60|
|     US|  A1|     Q1|    60|
|     US|  A2|     Q2|    25|
|     US|  A4|     Q1|    20|
|     US|  A5|     Q5|    15|
|     US|  A3|     Q3|    10|
+-------+----+-------+------+

>>> val window = Window.partitionBy("country").orderBy("part", "quarter")
>>> val resultDF = df.withColumn("agg", sum(df("result")).over(window))
>>> resultDF.show()

+-------+----+-------+------+---+
|country|part|quarter|result|agg|
+-------+----+-------+------+---+
|  India|  A1|     Q1|    40| 40|
|  India|  A2|     Q1|    30| 70|
|  India|  A3|     Q1|    21| 91|
|     US|  A1|     Q1|    60| 60|
|     US|  A2|     Q2|    25| 85|
|     US|  A3|     Q3|    10| 95|
|     US|  A4|     Q1|    20|115|
|     US|  A5|     Q5|    15|130|
| German|  A1|     Q1|    50| 50|
| German|  A3|     Q1|    60|110|
+-------+----+-------+------+---+

You can do this using Window functions, please take a look at the Databrick blog about Windows: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Hope this helps.

Happy Sparking! Cheers, Fokko

Upvotes: 2

Related Questions