chinkrishna
chinkrishna

Reputation: 121

Add new column to dataframe based on previous values and condition

I have sample dataframe, After grouping by level1 and date i got the resulted dataframe:

val group_df = qwe.groupBy($"level1",$"date").agg(sum("rel_amount").as("amount"))

+------+----------+------+
|level1|      date|amount|
+------+----------+------+
|     A|2016-03-31|   100|     
|     A|2016-02-28|   100|     
|     A|2016-01-31|   400|     
|     A|2015-12-31|   500|     
|     A|2015-11-30|  1200|     
|     A|2015-10-31|  1300|     
|     A|2014-12-31|   600|     
|     B|2016-03-31|    10|     
|     B|2016-02-28|   300|     
|     B|2016-01-31|   423|     
|     B|2015-12-31|   501|    
|     B|2015-11-30|   234|    
|     B|2015-10-31|  1234|    
|     B|2014-12-31|  3456|    
+------+----------+------+

Now I want to add extra column(previous) as year end, in this column I need to get the value for previous year end amount for each group.

For example: for level1 :A, date=2016-03-31 the value should be 500 because it is the amount for 2015-12-31. Similarily, for date= 2015-12-31 the value should be 600 because the amount for 2014-12-31.Need to calculate the previous year end amount for each row.

Expected output :

+------+----------+------+--------+
|level1|      date|amount|Previous|
+------+----------+------+--------+
|     A|2016-03-31|   100|     500|
|     A|2016-02-28|   100|     500|
|     A|2016-01-31|   400|     500|
|     A|2015-12-31|   500|     600|
|     A|2015-11-30|  1200|     600|
|     A|2015-10-31|  1300|     600|
|     A|2014-12-31|   600|     600|
|     B|2016-03-31|    10|     501|
|     B|2016-02-28|   300|     501|
|     B|2016-01-31|   423|     501|
|     B|2015-12-31|   501|    3456|
|     B|2015-11-30|   234|    3456|
|     B|2015-10-31|  1234|    3456|
|     B|2014-12-31|  3456|    3456|
+------+----------+------+--------+

Can someone help me on this.

Upvotes: 1

Views: 1412

Answers (3)

Robin
Robin

Reputation: 695

val amount = ss.sparkContext.parallelize(Seq(("B","2014-12-31", 3456))).toDF("level1", "dateY", "amount")

val yearStr = udf((date:String) => {(date.substring(0,4).toInt - 1) +"-12-31" })   

val df3 = amount.withColumn( "p", yearStr($"dateY"))    

df3.show()    

df3.createOrReplaceTempView("dfView")   

val df4 = df3.filter( s => s.getString(1).contains("12-31")).select( $"dateY".as("p"), $"level1",$"amount".as("am"))    

df4.show
df3.join( df4, Seq("p", "level1"), "left_outer").orderBy("level1", "amount").drop($"p").show()

Upvotes: 1

Leo C
Leo C

Reputation: 22439

One approach would be to use an UDF to manipulate column date as String to create a new column that holds the previous end-of-year value:

val df = Seq(
  ("A", "2016-03-31", 100),
  ("A", "2016-02-28", 100),
  ("A", "2016-01-31", 400),
  ("A", "2015-12-31", 500),
  ("A", "2015-11-30", 1200),
  ("A", "2015-10-31", 1300),
  ("A", "2014-12-31", 600),
  ("B", "2016-03-31", 10),
  ("B", "2016-02-28", 300),
  ("B", "2016-01-31", 423),
  ("B", "2015-12-31", 501),    
  ("B", "2015-11-30", 234),    
  ("B", "2015-10-31", 1234),   
  ("B", "2014-12-31", 3456)
).toDF(
  "level1", "date", "amount"
)

import org.apache.spark.sql.functions._

def previousEOY = udf( (d: String) => (d.substring(0, 4).toInt - 1).toString + "-12-31" )

val df2 = df.withColumn("previous_eoy", previousEOY($"date"))

For the convenience of standard SQL's scalar subquery capability, I'm reverting to using Spark's TempView (Note that max() is used in the subquery simply to satisfy single-row return):

df2.createOrReplaceTempView("dfView")

val df3 = spark.sqlContext.sql("""
  SELECT
    level1, date, amount, (
      SELECT max(amount) FROM dfView v2
      WHERE v2.level1 = v1.level1 AND v2.date = v1.previous_eoy
    ) previous
  FROM
    dfView v1
""")

df3.show
+------+----------+------+--------+
|level1|      date|amount|previous|
+------+----------+------+--------+
|     A|2016-03-31|   100|     500|
|     A|2016-02-28|   100|     500|
|     A|2016-01-31|   400|     500|
|     A|2015-12-31|   500|     600|
|     A|2015-11-30|  1200|     600|
|     A|2015-10-31|  1300|     600|
|     A|2014-12-31|   600|    null|
|     B|2016-03-31|    10|     501|
|     B|2016-02-28|   300|     501|
|     B|2016-01-31|   423|     501|
|     B|2015-12-31|   501|    3456|
|     B|2015-11-30|   234|    3456|
|     B|2015-10-31|  1234|    3456|
|     B|2014-12-31|  3456|    null|
+------+----------+------+--------+

Upvotes: 2

Carlos Bribiescas
Carlos Bribiescas

Reputation: 4407

First, create a dataframe that is year to year-end-value. Then join that into your original data frame where year is equal.

Upvotes: 0

Related Questions