Reputation: 23
I'm creating a Python Script that is supposed to be executed automatically on a regular basis. As it uses a calendar-DataFrame it is essential, that this calendar automatically updates itself.
Therefore, I would like to create a Spark DataFrame that contains every YearMonth
-Key in between a specific start date and today (excluding today's month). Chosen the start date 2015-01-01 and the current date being 2020-09-08 it should look like this:
--------------
YearMonth
--------------
201501
201502
201503
201504
201505
(...)
202006
202007
202008
---------------
How can this be achieved, so that every time the script is being executed it outputs the DataFrame using the last completed month as enddate?
EDIT: As I managed to extract the YYYYMM-Key from a YYYY-MM-DD Date-Field it would also be possible to get a DataFrame with all Dates between the start date and today. Extracting and removing duplicates could be executed afterwards in this case.
Upvotes: 1
Views: 367
Reputation: 2208
scala code
you can do the following things
import java.time.LocalDate
import java.time.{LocalDate, YearMonth}
import java.time.temporal.ChronoUnit
val start = LocalDate.of(2015,1,1) // you can pass this values from input args as well
val now = LocalDate.now() // 2020-09-08
val numMonths = ChronoUnit.MONTHS.between(YearMonth.from(start),YearMonth.from(now)).toInt
val dateStream: Stream[LocalDate] = start #:: dateStream.map(_.plusMonths(1))
val dates = dateStream.take(numMonths + 1).toSeq.map(t => (t.getYear(), t.getMonth().getValue())).toVector.toSeq
output :
scala>import java.time.LocalDate
import java.time.LocalDate
scala>import java.time.{LocalDate, YearMonth}
import java.time.{LocalDate, YearMonth}
scala>import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit
scala>val start = LocalDate.of(2015,1,1) // you can pass this values from input args as well
start: java.time.LocalDate = 2015-01-01
scala>val now = LocalDate.now() // 2020-09-08
now: java.time.LocalDate = 2020-09-08
scala>val numMonths = ChronoUnit.MONTHS.between(YearMonth.from(start),YearMonth.from(now)).toInt
numMonths: Int = 68
scala>val dateStream: Stream[LocalDate] = start #:: dateStream.map(_.plusMonths(1))
dateStream: Stream[java.time.LocalDate] = Stream(2015-01-01, ?)
scala>val dates = dateStream.take(numMonths + 1).toSeq.map(t => (t.getYear(), t.getMonth().getValue())).toVector.toSeq
dates: scala.collection.immutable.Seq[(Int, Int)] = Vector((2015,1), (2015,2), (2015,3), (2015,4), (2015,5), (2015,6), (2015,7), (2015,8), (2015,9), (2015,10), (2015,11), (2015,12), (2016,1), (2016,2), (2016,3), (2016,4), (2016,5), (2016,6), (2016,7), (2016,8), (2016,9), (2016,10), (2016,11), (2016,12), (2017,1), (2017,2), (2017,3), (2017,4), (2017,5), (2017,6), (2017,7), (2017,8), (2017,9), (2017,10), (2017,11), (2017,12), (2018,1), (2018,2), (2018,3), (2018,4), (2018,5), (2018,6), (2018,7), (2018,8), (2018,9), (2018,10), (2018,11), (2018,12), (2019,1), (2019,2), (2019,3), (2019,4), (2019,5), (2019,6), (2019,7), (2019,8), (2019,9), (2019,10), (2019,11), (2019,12), (2020,1), (2020,2), (2020,3), (2020,4), (2020,5), (2020,6), (2020,7), (2020,8), (2020,9))
you can parallelize dates and create DF like below
import org.apache.spark.sql.functions.{concat, lit , col}
spark.parallelize(dates).toDF.withColumn("YearMonth",concat(col("_c0"), lit(""), col("_c1")).select("YearMonth")
(OR)
spark.createDataFrame(spark.parallelize(dates)).withColumn("YearMonth",concat(col("_c0"), lit(""), col("_c1")).select("YearMonth")
References:
https://docs.scala-lang.org/tutorials/FAQ/finding-symbols.html
Scala: Get every combination of the last 24 months Concatenate columns in Apache Spark DataFrame
Upvotes: 1