LDropl
LDropl

Reputation: 944

Generating monthly timestamps between two dates in pyspark dataframe

I have some DataFrame with "date" column and I'm trying to generate a new DataFrame with all monthly timestamps between the min and max date from the "date" column.

One of the solution is below:

month_step = 31*60*60*24

min_date, max_date = df.select(min_("date").cast("long"), max_("date").cast("long")).first()

df_ts = spark.range(
    (min_date / month_step) * month_step, 
    ((max_date / month_step) + 1) * month_step,
    month_step
).select(col("id").cast("timestamp").alias("yearmonth"))

df_formatted_ts = df_ts.withColumn(
    "yearmonth",
    f.concat(f.year("yearmonth"), f.lit('-'), format_string("%02d", f.month("yearmonth")))
).select('yearmonth')

df_formatted_ts.orderBy(asc('yearmonth')).show(150, False)

The problem is that I took as a month_step 31 days and its not really correct because some of the months have 30 days and even 28 days. Is possible to somehow make it more precise?

Just as a note: Later I only need year and month values so I will ignore day and time. But anyway because I'm generating timestamps between quite a big date range (between 2001 and 2018) the timestamps shifting.

That's why sometimes some months will be skipped. For example, this snapshot is missing 2010-02:

|2010-01  |
|2010-03  |
|2010-04  |
|2010-05  |
|2010-06  |
|2010-07  |

I checked and there are just 3 months which were skipped from 2001 through 2018.

Upvotes: 5

Views: 12027

Answers (1)

pault
pault

Reputation: 43504

Suppose you had the following DataFrame:

data = [("2000-01-01","2002-12-01")]
df = spark.createDataFrame(data, ["minDate", "maxDate"])
df.show()
#+----------+----------+
#|   minDate|   maxDate|
#+----------+----------+
#|2000-01-01|2002-12-01|
#+----------+----------+

You can add a column date with all of the months in between minDate and maxDate, by following the same approach as my answer to this question.

Just replace pyspark.sql.functions.datediff with pyspark.sql.functions.months_between, and use add_months instead of date_add:

import pyspark.sql.functions as f

df.withColumn("monthsDiff", f.months_between("maxDate", "minDate"))\
    .withColumn("repeat", f.expr("split(repeat(',', monthsDiff), ',')"))\
    .select("*", f.posexplode("repeat").alias("date", "val"))\
    .withColumn("date", f.expr("add_months(minDate, date)"))\
    .select('date')\
    .show(n=50)
#+----------+
#|      date|
#+----------+
#|2000-01-01|
#|2000-02-01|
#|2000-03-01|
#|2000-04-01|
# ...skipping some rows...
#|2002-10-01|
#|2002-11-01|
#|2002-12-01|
#+----------+

Upvotes: 12

Related Questions