Ramesh
Ramesh

Reputation: 1593

Spark dataframe data aggregation

I have a below requirement to aggregate the data on Spark dataframe in scala.

I have a spark dataframe with two columns.

mo_id   sales
201601  11.01
201602  12.01
201603  13.01
201604  14.01
201605  15.01
201606  16.01
201607  17.01
201608  18.01
201609  19.01
201610  20.01
201611  21.01
201612  22.01

As shown above the dataframe has two columns 'mo_id' and 'sales'. I want to add a new column (agg_sales)to the dataframe which should have the sum of sales upto the current month like as shown below.

mo_id   sales   agg_sales
201601  10  10
201602  20  30
201603  30  60
201604  40  100
201605  50  150
201606  60  210
201607  70  280
201608  80  360
201609  90  450
201610  100 550
201611  110 660
201612  120 780

Description:

For the month 201603 agg_sales will be sum of sales from 201601 to 201603. For the month 201604 agg_sales will be sum of sales from 201601 to 201604. and so on.

Can anyone please help to do this.

Versions using : Spark 1.6.2 and Scala 2.10

Upvotes: 1

Views: 843

Answers (1)

evan.oman
evan.oman

Reputation: 5572

You are looking for a cumulative sum which can be accomplished with a window function:

scala> val df = sc.parallelize(Seq((201601, 10), (201602, 20), (201603, 30), (201604, 40), (201605, 50), (201606, 60), (201607, 70), (201608, 80), (201609, 90), (201610, 100), (201611, 110), (201612, 120))).toDF("id","sales")
df: org.apache.spark.sql.DataFrame = [id: int, sales: int]

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val ordering = Window.orderBy("id")
ordering: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@75d454a4

scala> df.withColumn("agg_sales", sum($"sales").over(ordering)).show 
16/12/27 21:11:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+-----+-------------+
|    id|sales|  agg_sales  |
+------+-----+-------------+
|201601|   10|           10|
|201602|   20|           30|
|201603|   30|           60|
|201604|   40|          100|
|201605|   50|          150|
|201606|   60|          210|
|201607|   70|          280|
|201608|   80|          360|
|201609|   90|          450|
|201610|  100|          550|
|201611|  110|          660|
|201612|  120|          780|
+------+-----+-------------+

Note that I defined the ordering on the ids, you would probably want some sort of time stamp to order the summation.

Upvotes: 1

Related Questions