a.r. noureldin
a.r. noureldin

Reputation: 63

scala spark - groupBy to find mean between months in a date range

I am looking at this drone rental dataset. I would like to try grouping by the Result column in Spark to show the mean result ($) each drone made as a function of the days it spent in that month.

ie. the value in the Result column divided by total days then attributed to the days in each month between the start and end dates

+------+------------------+------------------+--------+
| Drone|     Start        |      End         | Result |
+------+------------------+------------------+--------+
| DR1    16/06/2013 10:30   22/08/2013 07:00    2786  |
| DR1    20/04/2013 23:30   16/06/2013 10:30    7126  |
| DR1    24/01/2013 23:00   20/04/2013 23:30    2964  |
| DR2    01/03/2014 19:00   07/05/2014 18:00    8884  |
| DR2    04/09/2015 09:00   04/11/2015 07:00    7828  |
| DR2    04/10/2013 05:00   24/12/2013 07:00    5700  |
+-----------------------------------------------------+

This is difficult as it is a longer-term rental business and not values associated with one date and so a simple groupBy isn't working for me.

kindly please note the drone is hired on a per minute basis in the full dataset is a little more messy.

I would appreciate some help on the correct thought process for approaching a problem like this and what the code would look like.

How would you change what I have written below to consider each month as a separate case? (I only can base it on the start date) :/

val df_avgs = df.groupBy("Start").mean()
df_avgs.select($"Date",$"avg(Result)").show()

taking the first example from each drone type, my expected output would be:

+------+-------+-------+---------+
|Drone | Month | Days  |   Avg   |
+------+-------+-------+---------+
|DR1     June      X       $YY   |
|DR1     July      X       $YY   |
|DR1     August    X       $YY   |
|DR2     March     Y       $ZZ   |
|DR2     April     Y       $ZZ   |
|DR2     May       Y       $ZZ   |
+--------------------------------+

Thanks so much

Upvotes: 0

Views: 1413

Answers (1)

stack0114106
stack0114106

Reputation: 8721

Could you please check this out?. I have used "MMM-yy" in the date-formatting so that if the start and end date spans across years, then it will distinguish easily. You could change that to "MMM" if you just need the month alone.

scala> val df_t = Seq(("DR1","16/06/2013 10:30","22/08/2013 07:00",2786),("DR1","20/04/2013 23:30","16/06/2013 10:30",7126),("DR1","24/01/2013 23:00","20/04/2013 23:30",2964),("DR2","01/03/2014 19:00","07/05/2014 18:00",8884),("DR2","04/09/2015 09:00","04/11/2015 07:00",7828),("DR2","04/10/2013 05:00","24/12/2013 07:00",5700)).toDF("drone","start","end","result")
df_t: org.apache.spark.sql.DataFrame = [drone: string, start: string ... 2 more fields]

scala> val df = df_t.withColumn("start",to_timestamp('start,"dd/MM/yyyy HH:mm")).withColumn("end",to_timestamp('end,"dd/MM/yyyy HH:mm"))
df: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 2 more fields]

scala> df.show(false)
+-----+-------------------+-------------------+------+
|drone|start              |end                |result|
+-----+-------------------+-------------------+------+
|DR1  |2013-06-16 10:30:00|2013-08-22 07:00:00|2786  |
|DR1  |2013-04-20 23:30:00|2013-06-16 10:30:00|7126  |
|DR1  |2013-01-24 23:00:00|2013-04-20 23:30:00|2964  |
|DR2  |2014-03-01 19:00:00|2014-05-07 18:00:00|8884  |
|DR2  |2015-09-04 09:00:00|2015-11-04 07:00:00|7828  |
|DR2  |2013-10-04 05:00:00|2013-12-24 07:00:00|5700  |
+-----+-------------------+-------------------+------+


scala> :paste
// Entering paste mode (ctrl-D to finish)

def months_range(a:java.sql.Date,b:java.sql.Date):Seq[String]=
{
import java.time._
import java.time.format._
val start = a.toLocalDate
val end = b.toLocalDate
(start.toEpochDay until end.toEpochDay).map(LocalDate.ofEpochDay(_)).map(DateTimeFormatter.ofPattern("MMM-yy").format(_)).toSet.toSeq
}

// Exiting paste mode, now interpreting.

months_range: (a: java.sql.Date, b: java.sql.Date)Seq[String]

scala> val udf_months_range = udf(  months_range(_:java.sql.Date,_:java.sql.Date):Seq[String] )
udf_months_range: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StringType,true),Some(List(DateType, DateType)))

scala> val df2 = df.withColumn("days",datediff('end,'start)).withColumn("diff_months",udf_months_range('start,'end))
df2: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 4 more fields]

scala> df2.show(false)
+-----+-------------------+-------------------+------+----+--------------------------------+
|drone|start              |end                |result|days|diff_months                     |
+-----+-------------------+-------------------+------+----+--------------------------------+
|DR1  |2013-06-16 10:30:00|2013-08-22 07:00:00|2786  |67  |[Jun-13, Jul-13, Aug-13]        |
|DR1  |2013-04-20 23:30:00|2013-06-16 10:30:00|7126  |57  |[Apr-13, May-13, Jun-13]        |
|DR1  |2013-01-24 23:00:00|2013-04-20 23:30:00|2964  |86  |[Jan-13, Feb-13, Mar-13, Apr-13]|
|DR2  |2014-03-01 19:00:00|2014-05-07 18:00:00|8884  |67  |[Mar-14, Apr-14, May-14]        |
|DR2  |2015-09-04 09:00:00|2015-11-04 07:00:00|7828  |61  |[Sep-15, Oct-15, Nov-15]        |
|DR2  |2013-10-04 05:00:00|2013-12-24 07:00:00|5700  |81  |[Oct-13, Nov-13, Dec-13]        |
+-----+-------------------+-------------------+------+----+--------------------------------+


scala> df2.withColumn("month",explode('diff_months)).withColumn("Avg",'result/'days).select("drone","month","days","avg").show(false)
+-----+------+----+------------------+
|drone|month |days|avg               |
+-----+------+----+------------------+
|DR1  |Jun-13|67  |41.582089552238806|
|DR1  |Jul-13|67  |41.582089552238806|
|DR1  |Aug-13|67  |41.582089552238806|
|DR1  |Apr-13|57  |125.01754385964912|
|DR1  |May-13|57  |125.01754385964912|
|DR1  |Jun-13|57  |125.01754385964912|
|DR1  |Jan-13|86  |34.46511627906977 |
|DR1  |Feb-13|86  |34.46511627906977 |
|DR1  |Mar-13|86  |34.46511627906977 |
|DR1  |Apr-13|86  |34.46511627906977 |
|DR2  |Mar-14|67  |132.59701492537314|
|DR2  |Apr-14|67  |132.59701492537314|
|DR2  |May-14|67  |132.59701492537314|
|DR2  |Sep-15|61  |128.327868852459  |
|DR2  |Oct-15|61  |128.327868852459  |
|DR2  |Nov-15|61  |128.327868852459  |
|DR2  |Oct-13|81  |70.37037037037037 |
|DR2  |Nov-13|81  |70.37037037037037 |
|DR2  |Dec-13|81  |70.37037037037037 |
+-----+------+----+------------------+


scala>

EDIT1

Splitting on based on the number of days in each month. The code has to be changed from UDF.

scala> :paste
// Entering paste mode (ctrl-D to finish)

def months_range(a:java.sql.Date,b:java.sql.Date)=
{
import java.time._
import java.time.format._
val start = a.toLocalDate
val end = b.toLocalDate
(start.toEpochDay until end.toEpochDay).map(LocalDate.ofEpochDay(_)).map(DateTimeFormatter.ofPattern("MMM-yy").format(_)).groupBy(identity).map( x => (x._1,x._2.length) )
}

// Exiting paste mode, now interpreting.

months_range: (a: java.sql.Date, b: java.sql.Date)scala.collection.immutable.Map[String,Int]

scala> val udf_months_range = udf(  months_range(_:java.sql.Date,_:java.sql.Date):Map[String,Int] )
udf_months_range: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,MapType(StringType,IntegerType,false),Some(List(DateType, DateType)))

scala>  val df2 = df.withColumn("days",datediff('end,'start)).withColumn("diff_months",udf_months_range('start,'end))
df2: org.apache.spark.sql.DataFrame = [drone: string, start: timestamp ... 4 more fields]

scala> val df3=df2.select(col("*"),explode('diff_months).as(Seq("month","month_days")) ).withColumn("mnth_rent",'result*('month_days/'days)).select("drone","month","month_days","days","mnth_rent")
df3: org.apache.spark.sql.DataFrame = [drone: string, month: string ... 3 more fields]

scala> df3.show(false)
+-----+------+----------+----+------------------+
|drone|month |month_days|days|mnth_rent         |
+-----+------+----------+----+------------------+
|DR1  |Aug-13|21        |67  |873.223880597015  |
|DR1  |Jul-13|31        |67  |1289.044776119403 |
|DR1  |Jun-13|15        |67  |623.7313432835821 |
|DR1  |May-13|31        |57  |3875.543859649123 |
|DR1  |Apr-13|11        |57  |1375.1929824561403|
|DR1  |Jun-13|15        |57  |1875.2631578947367|
|DR1  |Apr-13|19        |86  |654.8372093023256 |
|DR1  |Feb-13|28        |86  |965.0232558139536 |
|DR1  |Mar-13|31        |86  |1068.4186046511627|
|DR1  |Jan-13|8         |86  |275.72093023255815|
|DR2  |Apr-14|30        |67  |3977.910447761194 |
|DR2  |Mar-14|31        |67  |4110.507462686567 |
|DR2  |May-14|6         |67  |795.5820895522388 |
|DR2  |Nov-15|3         |61  |384.983606557377  |
|DR2  |Oct-15|31        |61  |3978.1639344262294|
|DR2  |Sep-15|27        |61  |3464.8524590163934|
|DR2  |Nov-13|30        |81  |2111.111111111111 |
|DR2  |Oct-13|28        |81  |1970.3703703703702|
|DR2  |Dec-13|23        |81  |1618.5185185185185|
+-----+------+----------+----+------------------+


scala> df3.groupBy('drone,'month).agg(sum('month_days).as("s_month_days"),sum('mnth_rent).as("mnth_rent"),max('days).as("days")).orderBy('drone,'month).show(false)
+-----+------+------------+------------------+----+
|drone|month |s_month_days|mnth_rent         |days|
+-----+------+------------+------------------+----+
|DR1  |Apr-13|30          |2030.030191758466 |86  |
|DR1  |Aug-13|21          |873.223880597015  |67  |
|DR1  |Feb-13|28          |965.0232558139536 |86  |
|DR1  |Jan-13|8           |275.72093023255815|86  |
|DR1  |Jul-13|31          |1289.044776119403 |67  |
|DR1  |Jun-13|30          |2498.994501178319 |67  |
|DR1  |Mar-13|31          |1068.4186046511627|86  |
|DR1  |May-13|31          |3875.543859649123 |57  |
|DR2  |Apr-14|30          |3977.910447761194 |67  |
|DR2  |Dec-13|23          |1618.5185185185185|81  |
|DR2  |Mar-14|31          |4110.507462686567 |67  |
|DR2  |May-14|6           |795.5820895522388 |67  |
|DR2  |Nov-13|30          |2111.111111111111 |81  |
|DR2  |Nov-15|3           |384.983606557377  |61  |
|DR2  |Oct-13|28          |1970.3703703703702|81  |
|DR2  |Oct-15|31          |3978.1639344262294|61  |
|DR2  |Sep-15|27          |3464.8524590163934|61  |
+-----+------+------------+------------------+----+


scala>

Upvotes: 1

Related Questions