Reputation: 63
I'm working on some data preparation for a project I'm involved in. We do most of the work in Databricks, using the underlying Apache Spark for computations on large datasets. Everything is done in PySpark.
My goal is to convert a date variable to a variable yearperiod
, which divides the year into 13 periods of 4 weeks (with some exceptions). The value is a concatenation of the year and the period, e.g. yearperiod = 201513
would be the year 2015, period 13.
I have two tables: yp_table
which contains start and end dates (Edit: type DateType()
) for yearperiods (between 2012 and now, Edit: ~120 rows):
+----------+----------+----------+
| start| end|yearperiod|
+----------+----------+----------+
|2012-01-16|2012-01-29| 201201|
|2012-01-30|2012-02-26| 201202|
|2012-02-27|2012-03-25| 201203|
|2012-03-26|2012-04-22| 201204|
|2012-04-23|2012-05-20| 201205|
|2012-05-21|2012-06-17| 201206|
....
And I have the actual data
table, which contains a Date column (Edit: type StringType()
):
+--------+--------+--------+-----+
| Var1| Var2| Date| Var3|
+--------+--------+--------+-----+
| xxxxxx| xxxx|20191231| x,xx|
| xxxxxx| xxxx|20191231| x,xx|
| xxxxxx| xxxx|20191231| x,xx|
| xxxxxx| xxxx|20200101| x,xx|
| xxxxxx| xxxx|20200101| x,xx|
| xxxxxx| xxxx|20200101| x,xx|
| xxxxxx| xxxx|20200101| x,xx|
...
My question: how do I compute a column yearperiod
for the data
table, by comparing data.Date
with both yp_table.start
and yp_table.end
?
So far I've been able to make it work with regular Python (a solution with list comprehensions), but it proves to be too slow for large datasets. Any help is greatly appreciated!
Edit: for privacy reasons I can't give the actual schemas of the dataframes. I've edited above to include the types of the relevant columns.
Upvotes: 1
Views: 79
Reputation: 4089
Add a column to your data
df that contains the dates in the matching format to the yp_table
and then join them filtering by date intervals. Since the yp_table
is small, you can use a broadcast join to speed things up.
import pyspark.sql.functions as fun
# Date lookup
start_dates = ["2012-01-16", "2012-01-30", "2012-02-27", "2012-03-26", "2012-04-23", "2012-05-21"]
end_dates = ["2012-01-29", "2012-02-26", "2012-03-25", "2012-04-22", "2012-05-20", "2012-06-17"]
yearperiod = ["201201", "201202", "201203", "201204", "201205", "201206"]
yp_table = spark.createDataFrame(pd.DataFrame({'start': start_dates, 'end': end_dates, 'yearperiod': yearperiod}))
# Data df
dates = ["20120116", "20120130", "20120228", "20120301", "20200101", "20200101", "20200101"]
vals = range(0, len(dates))
data = spark.createDataFrame(pd.DataFrame({'Dates':dates, 'vals': vals}))
# Add formatted data_str column for joining
data = data.withColumn("date_str", fun.concat_ws("-", data.Dates.substr(0,4), data.Dates.substr(5,2), data.Dates.substr(7,2))) # + "-" + data.Dates.substr(6,8))
# Broadcase join small yp_table into the data table using conditional
joined = data.join(fun.broadcast(yp_table), (data.date_str >= yp_table.start) & (data.date_str < yp_table.end))
yp_table.show()
data.show()
joined.show()
+----------+----------+----------+
| start| end|yearperiod|
+----------+----------+----------+
|2012-01-16|2012-01-29| 201201|
|2012-01-30|2012-02-26| 201202|
|2012-02-27|2012-03-25| 201203|
|2012-03-26|2012-04-22| 201204|
|2012-04-23|2012-05-20| 201205|
|2012-05-21|2012-06-17| 201206|
+----------+----------+----------+
+--------+----+----------+
| Dates|vals| date_str|
+--------+----+----------+
|20120116| 0|2012-01-16|
|20120130| 1|2012-01-30|
|20120228| 2|2012-02-28|
|20120301| 3|2012-03-01|
|20200101| 4|2020-01-01|
|20200101| 5|2020-01-01|
|20200101| 6|2020-01-01|
+--------+----+----------+
+--------+----+----------+----------+----------+----------+
| Dates|vals| date_str| start| end|yearperiod|
+--------+----+----------+----------+----------+----------+
|20120116| 0|2012-01-16|2012-01-16|2012-01-29| 201201|
|20120130| 1|2012-01-30|2012-01-30|2012-02-26| 201202|
|20120228| 2|2012-02-28|2012-02-27|2012-03-25| 201203|
|20120301| 3|2012-03-01|2012-02-27|2012-03-25| 201203|
+--------+----+----------+----------+----------+----------+
Upvotes: 1