Reputation: 121
I have sample dataframe, After grouping by level1 and date i got the resulted dataframe:
val group_df = qwe.groupBy($"level1",$"date").agg(sum("rel_amount").as("amount"))
+------+----------+------+
|level1| date|amount|
+------+----------+------+
| A|2016-03-31| 100|
| A|2016-02-28| 100|
| A|2016-01-31| 400|
| A|2015-12-31| 500|
| A|2015-11-30| 1200|
| A|2015-10-31| 1300|
| A|2014-12-31| 600|
| B|2016-03-31| 10|
| B|2016-02-28| 300|
| B|2016-01-31| 423|
| B|2015-12-31| 501|
| B|2015-11-30| 234|
| B|2015-10-31| 1234|
| B|2014-12-31| 3456|
+------+----------+------+
Now I want to add extra column(previous) as year end, in this column I need to get the value for previous year end amount for each group.
For example: for level1 :A, date=2016-03-31 the value should be 500 because it is the amount for 2015-12-31. Similarily, for date= 2015-12-31 the value should be 600 because the amount for 2014-12-31.Need to calculate the previous year end amount for each row.
Expected output :
+------+----------+------+--------+
|level1| date|amount|Previous|
+------+----------+------+--------+
| A|2016-03-31| 100| 500|
| A|2016-02-28| 100| 500|
| A|2016-01-31| 400| 500|
| A|2015-12-31| 500| 600|
| A|2015-11-30| 1200| 600|
| A|2015-10-31| 1300| 600|
| A|2014-12-31| 600| 600|
| B|2016-03-31| 10| 501|
| B|2016-02-28| 300| 501|
| B|2016-01-31| 423| 501|
| B|2015-12-31| 501| 3456|
| B|2015-11-30| 234| 3456|
| B|2015-10-31| 1234| 3456|
| B|2014-12-31| 3456| 3456|
+------+----------+------+--------+
Can someone help me on this.
Upvotes: 1
Views: 1412
Reputation: 695
val amount = ss.sparkContext.parallelize(Seq(("B","2014-12-31", 3456))).toDF("level1", "dateY", "amount")
val yearStr = udf((date:String) => {(date.substring(0,4).toInt - 1) +"-12-31" })
val df3 = amount.withColumn( "p", yearStr($"dateY"))
df3.show()
df3.createOrReplaceTempView("dfView")
val df4 = df3.filter( s => s.getString(1).contains("12-31")).select( $"dateY".as("p"), $"level1",$"amount".as("am"))
df4.show
df3.join( df4, Seq("p", "level1"), "left_outer").orderBy("level1", "amount").drop($"p").show()
Upvotes: 1
Reputation: 22439
One approach would be to use an UDF to manipulate column date
as String
to create a new column that holds the previous end-of-year value:
val df = Seq(
("A", "2016-03-31", 100),
("A", "2016-02-28", 100),
("A", "2016-01-31", 400),
("A", "2015-12-31", 500),
("A", "2015-11-30", 1200),
("A", "2015-10-31", 1300),
("A", "2014-12-31", 600),
("B", "2016-03-31", 10),
("B", "2016-02-28", 300),
("B", "2016-01-31", 423),
("B", "2015-12-31", 501),
("B", "2015-11-30", 234),
("B", "2015-10-31", 1234),
("B", "2014-12-31", 3456)
).toDF(
"level1", "date", "amount"
)
import org.apache.spark.sql.functions._
def previousEOY = udf( (d: String) => (d.substring(0, 4).toInt - 1).toString + "-12-31" )
val df2 = df.withColumn("previous_eoy", previousEOY($"date"))
For the convenience of standard SQL's scalar subquery capability, I'm reverting to using Spark's TempView
(Note that max()
is used in the subquery simply to satisfy single-row return):
df2.createOrReplaceTempView("dfView")
val df3 = spark.sqlContext.sql("""
SELECT
level1, date, amount, (
SELECT max(amount) FROM dfView v2
WHERE v2.level1 = v1.level1 AND v2.date = v1.previous_eoy
) previous
FROM
dfView v1
""")
df3.show
+------+----------+------+--------+
|level1| date|amount|previous|
+------+----------+------+--------+
| A|2016-03-31| 100| 500|
| A|2016-02-28| 100| 500|
| A|2016-01-31| 400| 500|
| A|2015-12-31| 500| 600|
| A|2015-11-30| 1200| 600|
| A|2015-10-31| 1300| 600|
| A|2014-12-31| 600| null|
| B|2016-03-31| 10| 501|
| B|2016-02-28| 300| 501|
| B|2016-01-31| 423| 501|
| B|2015-12-31| 501| 3456|
| B|2015-11-30| 234| 3456|
| B|2015-10-31| 1234| 3456|
| B|2014-12-31| 3456| null|
+------+----------+------+--------+
Upvotes: 2
Reputation: 4407
First, create a dataframe that is year to year-end-value. Then join that into your original data frame where year is equal.
Upvotes: 0