pythonnewbie2020
pythonnewbie2020

Reputation: 23

Create a Spark DataFrame including Date-Keys between two dates

I'm creating a Python Script that is supposed to be executed automatically on a regular basis. As it uses a calendar-DataFrame it is essential, that this calendar automatically updates itself.

Therefore, I would like to create a Spark DataFrame that contains every YearMonth-Key in between a specific start date and today (excluding today's month). Chosen the start date 2015-01-01 and the current date being 2020-09-08 it should look like this:

--------------
YearMonth
--------------
201501
201502
201503
201504
201505
(...)
202006
202007
202008
---------------

How can this be achieved, so that every time the script is being executed it outputs the DataFrame using the last completed month as enddate?

EDIT: As I managed to extract the YYYYMM-Key from a YYYY-MM-DD Date-Field it would also be possible to get a DataFrame with all Dates between the start date and today. Extracting and removing duplicates could be executed afterwards in this case.

Upvotes: 1

Views: 367

Answers (1)

kavetiraviteja
kavetiraviteja

Reputation: 2208

scala code

you can do the following things

import java.time.LocalDate
import java.time.{LocalDate, YearMonth}
import java.time.temporal.ChronoUnit
val start =  LocalDate.of(2015,1,1) // you can pass this values from input args as well
val now = LocalDate.now() // 2020-09-08
val numMonths = ChronoUnit.MONTHS.between(YearMonth.from(start),YearMonth.from(now)).toInt
val dateStream: Stream[LocalDate] = start #:: dateStream.map(_.plusMonths(1))
val dates = dateStream.take(numMonths + 1).toSeq.map(t => (t.getYear(), t.getMonth().getValue())).toVector.toSeq

output :

scala>import java.time.LocalDate
import java.time.LocalDate

scala>import java.time.{LocalDate, YearMonth}
import java.time.{LocalDate, YearMonth}

scala>import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit

scala>val start =  LocalDate.of(2015,1,1) // you can pass this values from input args as well
start: java.time.LocalDate = 2015-01-01

scala>val now = LocalDate.now() // 2020-09-08
now: java.time.LocalDate = 2020-09-08

scala>val numMonths = ChronoUnit.MONTHS.between(YearMonth.from(start),YearMonth.from(now)).toInt
numMonths: Int = 68

scala>val dateStream: Stream[LocalDate] = start #:: dateStream.map(_.plusMonths(1))
dateStream: Stream[java.time.LocalDate] = Stream(2015-01-01, ?)

scala>val dates = dateStream.take(numMonths + 1).toSeq.map(t => (t.getYear(), t.getMonth().getValue())).toVector.toSeq
dates: scala.collection.immutable.Seq[(Int, Int)] = Vector((2015,1), (2015,2), (2015,3), (2015,4), (2015,5), (2015,6), (2015,7), (2015,8), (2015,9), (2015,10), (2015,11), (2015,12), (2016,1), (2016,2), (2016,3), (2016,4), (2016,5), (2016,6), (2016,7), (2016,8), (2016,9), (2016,10), (2016,11), (2016,12), (2017,1), (2017,2), (2017,3), (2017,4), (2017,5), (2017,6), (2017,7), (2017,8), (2017,9), (2017,10), (2017,11), (2017,12), (2018,1), (2018,2), (2018,3), (2018,4), (2018,5), (2018,6), (2018,7), (2018,8), (2018,9), (2018,10), (2018,11), (2018,12), (2019,1), (2019,2), (2019,3), (2019,4), (2019,5), (2019,6), (2019,7), (2019,8), (2019,9), (2019,10), (2019,11), (2019,12), (2020,1), (2020,2), (2020,3), (2020,4), (2020,5), (2020,6), (2020,7), (2020,8), (2020,9))

you can parallelize dates and create DF like below

 import org.apache.spark.sql.functions.{concat, lit , col}
 spark.parallelize(dates).toDF.withColumn("YearMonth",concat(col("_c0"), lit(""), col("_c1")).select("YearMonth")
    (OR)
spark.createDataFrame(spark.parallelize(dates)).withColumn("YearMonth",concat(col("_c0"), lit(""), col("_c1")).select("YearMonth")

References:

https://docs.scala-lang.org/tutorials/FAQ/finding-symbols.html

Scala: Get every combination of the last 24 months Concatenate columns in Apache Spark DataFrame

Upvotes: 1

Related Questions