Reputation: 382
I've processed parquet file and created the following data frame in scala spark 2.4.3.
+-----------+------------+-----------+--------------+-----------+
| itemno|requestMonth|requestYear|totalRequested|requestDate|
+-----------+------------+-----------+--------------+-----------+
| 7512365| 2| 2014| 110.0| 2014-02-01|
| 7519278| 4| 2013| 96.0| 2013-04-01|
|5436134-070| 12| 2013| 8.0| 2013-12-01|
| 7547385| 1| 2014| 89.0| 2014-01-01|
| 0453978| 9| 2014| 18.0| 2014-09-01|
| 7558402| 10| 2014| 260.0| 2014-10-01|
|5437662-070| 7| 2013| 78.0| 2013-07-01|
| 3089858| 11| 2014| 5.0| 2014-11-01|
| 7181584| 2| 2017| 4.0| 2017-02-01|
| 7081417| 3| 2017| 15.0| 2017-03-01|
| 5814215| 4| 2017| 35.0| 2017-04-01|
| 7178940| 10| 2014| 5.0| 2014-10-01|
| 0450636| 1| 2015| 7.0| 2015-01-01|
| 5133406| 5| 2014| 46.0| 2014-05-01|
| 2204858| 12| 2015| 34.0| 2015-12-01|
| 1824299| 5| 2015| 1.0| 2015-05-01|
|5437474-620| 8| 2015| 4.0| 2015-08-01|
| 3086317| 9| 2014| 1.0| 2014-09-01|
| 2204331| 3| 2015| 2.0| 2015-03-01|
| 5334160| 1| 2018| 2.0| 2018-01-01|
+-----------+------------+-----------+--------------+-----------+
To derive a new feature, I am trying to apply logic and rearrange data frame as following
itemno – as it is in above-mentioned data frame
startDate - the start of the season
endDate - the end of the season
totalRequested - number of parts requested in that season
percetageOfRequests - totalRequested in current season / total over this plus 3 previous seasons (4 total seasons)
//seasons date for reference
Spring: 1 March to 31 May.
Summer: 1 June to 31 August.
Autumn: 1 September to 30 November.
Winter: 1 December to 28 February.
What I did:
I tried following two logics
case
when to_char(StartDate,'MMDD') between '0301' and '0531' then 'spring'
.....
.....
end as season
but it didn't work. I did to_char logic in oracle DB and it worked there but after looking around, I found spark SQL doesn't have this function. Also, I tried
import org.apache.spark.sql.functions._
val dateDF1 = orvPartRequestsDF.withColumn("MMDD", concat_ws("-", month($"requestDate"), dayofmonth($"requestDate")))
%sql
select distinct requestDate, MMDD,
case
when MMDD between '3-1' and '5-31' then 'Spring'
when MMDD between '6-1' and '8-31' then 'Summer'
when MMDD between '9-1' and '11-30' then 'Autumn'
when MMDD between '12-1' and '2-28' then 'Winter'
end as season
from temporal
and it also didn't work. Could you please let me know what I am missing here (my guess is I can't compare strings like this but I am not sure so I asked here) and how I can solve this?
After JXC solution#1 with range between
Since I was seeing some dicrepancy, I am sharing data frame again. Following is the dataframe seasonDF12
+-------+-----------+--------------+------+----------+
| itemno|requestYear|totalRequested|season|seasonCalc|
+-------+-----------+--------------+------+----------+
|0450000| 2011| 0.0|Winter| 201075|
|0450000| 2011| 0.0|Winter| 201075|
|0450000| 2011| 0.0|Spring| 201100|
|0450000| 2011| 0.0|Spring| 201100|
|0450000| 2011| 0.0|Spring| 201100|
|0450000| 2011| 0.0|Summer| 201125|
|0450000| 2011| 0.0|Summer| 201125|
|0450000| 2011| 0.0|Summer| 201125|
|0450000| 2011| 0.0|Autumn| 201150|
|0450000| 2011| 0.0|Autumn| 201150|
|0450000| 2011| 0.0|Autumn| 201150|
|0450000| 2011| 0.0|Winter| 201175|
|0450000| 2012| 3.0|Winter| 201175|
|0450000| 2012| 1.0|Winter| 201175|
|0450000| 2012| 4.0|Spring| 201200|
|0450000| 2012| 0.0|Spring| 201200|
|0450000| 2012| 0.0|Spring| 201200|
|0450000| 2012| 2.0|Summer| 201225|
|0450000| 2012| 3.0|Summer| 201225|
|0450000| 2012| 2.0|Summer| 201225|
+-------+-----------+--------------+------+----------+
to which I'll apply
val seasonDF2 = seasonDF12.selectExpr("*", """
sum(totalRequested) OVER (
PARTITION BY itemno
ORDER BY seasonCalc
RANGE BETWEEN 100 PRECEDING AND CURRENT ROW
) AS sum_totalRequested
""")
and I am seeing
look at first 40 in sum_totalRequested column
. All the entries above it are 0. Not sure why it's 40. I think I already shared it but I need above dataframe to be transformed in to
itemno startDateOfSeason endDateOfSeason sum_totalRequestedBySeason (totalrequestedinCurrentSeason/totalRequestedinlast 3 + current season.)
Final output will be like this:
itemno startDateOfSeason endDateOfSeason season sum_totalRequestedBySeason (totalrequestedinCurrentSeason/totalRequestedinlast 3 + current season.)
123 12/01/2018 02/28/2019 winter 12 12/12+ 36 (36 from previous three seasons)
123 03/01/2019 05/31/2019 spring 24 24/24 + 45 (45 from previous three seasons)
Upvotes: 0
Views: 1644
Reputation: 13998
Edit-2: adjusted to calculate the sum groupby seasons first and then the Window aggregate sum:
Edit-1: Based on the comments, the named season is not required. we can set Spring
, Summer
, Autumn
, Winter
as 0
, 25
, 50
and 75
respectively and the season will be an integer added up by year(requestDate)*100
so that we can use rangeBetween (offset=-100 for current + the previous 3 seasons) in Window aggregate functions:
Note: below are pyspark code:
df.createOrReplaceTempView("df_table")
df1 = spark.sql("""
WITH t1 AS ( SELECT *
, year(requestDate) as YY
, date_format(requestDate, "MMdd") as MMDD
FROM df_table )
, t2 AS ( SELECT *,
CASE
WHEN MMDD BETWEEN '0301' AND '0531' THEN
named_struct(
'startDateOfSeason', date(concat_ws('-', YY, '03-01'))
, 'endDateOfSeason', date(concat_ws('-', YY, '05-31'))
, 'season', 'spring'
, 'label', int(YY)*100
)
WHEN MMDD BETWEEN '0601' AND '0831' THEN
named_struct(
'startDateOfSeason', date(concat_ws('-', YY, '06-01'))
, 'endDateOfSeason', date(concat_ws('-', YY, '08-31'))
, 'season', 'summer'
, 'label', int(YY)*100 + 25
)
WHEN MMDD BETWEEN '0901' AND '1130' THEN
named_struct(
'startDateOfSeason', date(concat_ws('-', YY, '09-01'))
, 'endDateOfSeason', date(concat_ws('-', YY, '11-30'))
, 'season', 'autumn'
, 'label', int(YY)*100 + 50
)
WHEN MMDD BETWEEN '1201' AND '1231' THEN
named_struct(
'startDateOfSeason', date(concat_ws('-', YY, '12-01'))
, 'endDateOfSeason', last_day(concat_ws('-', int(YY)+1, '02-28'))
, 'season', 'winter'
, 'label', int(YY)*100 + 75
)
WHEN MMDD BETWEEN '0101' AND '0229' THEN
named_struct(
'startDateOfSeason', date(concat_ws('-', int(YY)-1, '12-01'))
, 'endDateOfSeason', last_day(concat_ws('-', YY, '02-28'))
, 'season', 'winter'
, 'label', (int(YY)-1)*100 + 75
)
END AS seasons
FROM t1
)
SELECT itemno
, seasons.*
, sum(totalRequested) AS sum_totalRequestedBySeason
FROM t2
GROUP BY itemno, seasons
""")
This will get the following result:
df1.show()
+-----------+-----------------+---------------+------+------+--------------------------+
| itemno|startDateOfSeason|endDateOfSeason|season| label|sum_totalRequestedBySeason|
+-----------+-----------------+---------------+------+------+--------------------------+
|5436134-070| 2013-12-01| 2013-12-31|winter|201375| 8.0|
| 1824299| 2015-03-01| 2015-05-31|spring|201500| 1.0|
| 0453978| 2014-09-01| 2014-11-30|autumn|201450| 18.0|
| 7181584| 2017-01-01| 2017-02-28|winter|201675| 4.0|
| 7178940| 2014-09-01| 2014-11-30|autumn|201450| 5.0|
| 7547385| 2014-01-01| 2014-02-28|winter|201375| 89.0|
| 5814215| 2017-03-01| 2017-05-31|spring|201700| 35.0|
| 3086317| 2014-09-01| 2014-11-30|autumn|201450| 1.0|
| 0450636| 2015-01-01| 2015-02-28|winter|201475| 7.0|
| 2204331| 2015-03-01| 2015-05-31|spring|201500| 2.0|
|5437474-620| 2015-06-01| 2015-08-31|summer|201525| 4.0|
| 5133406| 2014-03-01| 2014-05-31|spring|201400| 46.0|
| 7081417| 2017-03-01| 2017-05-31|spring|201700| 15.0|
| 7519278| 2013-03-01| 2013-05-31|spring|201300| 96.0|
| 7558402| 2014-09-01| 2014-11-30|autumn|201450| 260.0|
| 2204858| 2015-12-01| 2015-12-31|winter|201575| 34.0|
|5437662-070| 2013-06-01| 2013-08-31|summer|201325| 78.0|
| 5334160| 2018-01-01| 2018-02-28|winter|201775| 2.0|
| 3089858| 2014-09-01| 2014-11-30|autumn|201450| 5.0|
| 7512365| 2014-01-01| 2014-02-28|winter|201375| 110.0|
+-----------+-----------------+---------------+------+------+--------------------------+
After we have the season totals, then calculate the sum of the current plus previous 3 seasons using Window aggregate function and then the ratio:
df1.selectExpr("*", """
round(sum_totalRequestedBySeason/sum(sum_totalRequestedBySeason) OVER (
PARTITION BY itemno
ORDER BY label
RANGE BETWEEN 100 PRECEDING AND CURRENT ROW
),2) AS ratio_of_current_over_current_plus_past_3_seasons
""").show()
Upvotes: 1