Reputation: 304
So I'm trying to collect_set a group of dates from a dataframe. The problem I'm facing is that the dates are not present in the order the dataframe has.
Example dataframe (this is a much larger dataset. Basically this dataframe tracks the
beginning date of a week, for every single day in a year)
+--------+-----------+----------+
|year_num|week_beg_dt| cal_dt|
+--------+-----------+----------+
| 2013| 2012-12-31|2012-12-31|
| 2013| 2012-12-31|2013-01-03|
| 2013| 2013-01-07|2013-01-07|
| 2013| 2013-01-07|2013-01-12|
| 2013| 2013-01-14|2013-01-14|
| 2013| 2013-01-14|2013-01-15|
| 2014| 2014-01-01|2014-01-01|
| 2014| 2014-01-01|2014-01-05|
| 2014| 2014-01-07|2014-01-07|
| 2014| 2014-01-07|2014-01-12|
| 2014| 2014-01-15|2014-01-15|
| 2014| 2014-01-15|2014-01-16|
What Im trying to get to is this
+--------+-------------------------------------+
|year_num| dates. |
+--------+-------------------------------------+
| 2013|[2012-12-31, 2013-01-07, 2013-01-14] |
| 2014|[2014-01-01, 2014-01-07, 2014-01-14] |
I have tried windowing to do it, as collect_set together with groupBy will result in unordered set:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('year_num').orderBy('week_beg_dt')
business_days_ = df2.withColumn('dates', F.collect_set('week_beg_dt').over(w)) \
.groupBy('year_num') \
.agg(F.max('dates').alias('dates')) \
.collect()
But I still end up with unordered sets. Any suggestions what I'm doing wrong and how to fix it?
Upvotes: 0
Views: 100
Reputation: 31530
For Spark 2.4+, use array_sort
in built function on collect_set
to get ordered list.
Example:
df1.show()
#+--------+-----------+----------+
#|year_num|week_beg_dt| cal_dt|
#+--------+-----------+----------+
#| 2013| 2012-12-31|2012-12-31|
#| 2013| 2012-12-31|2012-12-31|
#| 2013| 2013-01-07|2013-01-03|
#+--------+-----------+----------+
#without array_sort
df1.groupBy("year_num").agg(collect_set(col("week_beg_dt"))).show(10,False)
#+--------+------------------------+
#|year_num|collect_set(week_beg_dt)|
#+--------+------------------------+
#|2013 |[2013-01-07, 2012-12-31]|
#+--------+------------------------+
#using array_sort
df1.groupBy("year_num").agg(array_sort(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+------------------------------------+
#|year_num|array_sort(collect_set(week_beg_dt))|
#+--------+------------------------------------+
#|2013 |[2012-12-31, 2013-01-07] |
#+--------+------------------------------------+
For earlier versions of Spark:
from pyspark.sql.types import *
#udf to sort array
sort_arr_udf=udf(lambda x:sorted(x),ArrayType(StringType()))
df1.groupBy("year_num").agg(sort_arr_udf(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+----------------------------------------+
#|year_num|<lambda>(collect_set(week_beg_dt, 0, 0))|
#+--------+----------------------------------------+
#|2013 |[2012-12-31, 2013-01-07] |
#+--------+----------------------------------------+
Upvotes: 2