ni_i_ru_sama
ni_i_ru_sama

Reputation: 304

List is unordered even after using windowing with collect_set

So I'm trying to collect_set a group of dates from a dataframe. The problem I'm facing is that the dates are not present in the order the dataframe has.

Example dataframe (this is a much larger dataset. Basically this dataframe tracks the 
beginning date of a week, for every single day in a year)
+--------+-----------+----------+
|year_num|week_beg_dt|    cal_dt|
+--------+-----------+----------+
|    2013| 2012-12-31|2012-12-31|
|    2013| 2012-12-31|2013-01-03|
|    2013| 2013-01-07|2013-01-07|
|    2013| 2013-01-07|2013-01-12|
|    2013| 2013-01-14|2013-01-14|
|    2013| 2013-01-14|2013-01-15|
|    2014| 2014-01-01|2014-01-01|
|    2014| 2014-01-01|2014-01-05|
|    2014| 2014-01-07|2014-01-07|
|    2014| 2014-01-07|2014-01-12|
|    2014| 2014-01-15|2014-01-15|
|    2014| 2014-01-15|2014-01-16|

What Im trying to get to is this
+--------+-------------------------------------+
|year_num|   dates.                            |
+--------+-------------------------------------+
|    2013|[2012-12-31, 2013-01-07, 2013-01-14] |
|    2014|[2014-01-01, 2014-01-07, 2014-01-14] |

I have tried windowing to do it, as collect_set together with groupBy will result in unordered set:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('year_num').orderBy('week_beg_dt')

business_days_ = df2.withColumn('dates', F.collect_set('week_beg_dt').over(w)) \
                    .groupBy('year_num') \
                    .agg(F.max('dates').alias('dates')) \
                    .collect()

But I still end up with unordered sets. Any suggestions what I'm doing wrong and how to fix it?

Upvotes: 0

Views: 100

Answers (1)

notNull
notNull

Reputation: 31530

For Spark 2.4+, use array_sort in built function on collect_set to get ordered list.

Example:

df1.show()

#+--------+-----------+----------+
#|year_num|week_beg_dt|    cal_dt|
#+--------+-----------+----------+
#|    2013| 2012-12-31|2012-12-31|
#|    2013| 2012-12-31|2012-12-31|
#|    2013| 2013-01-07|2013-01-03|
#+--------+-----------+----------+

#without array_sort
df1.groupBy("year_num").agg(collect_set(col("week_beg_dt"))).show(10,False)
#+--------+------------------------+
#|year_num|collect_set(week_beg_dt)|
#+--------+------------------------+
#|2013    |[2013-01-07, 2012-12-31]|
#+--------+------------------------+

#using array_sort

df1.groupBy("year_num").agg(array_sort(collect_set(col("week_beg_dt")))).show(10,False)
#+--------+------------------------------------+
#|year_num|array_sort(collect_set(week_beg_dt))|
#+--------+------------------------------------+
#|2013    |[2012-12-31, 2013-01-07]            |
#+--------+------------------------------------+

For earlier versions of Spark:

from pyspark.sql.types import *

#udf to sort array
sort_arr_udf=udf(lambda x:sorted(x),ArrayType(StringType()))

df1.groupBy("year_num").agg(sort_arr_udf(collect_set(col("week_beg_dt")))).show(10,False)

#+--------+----------------------------------------+
#|year_num|<lambda>(collect_set(week_beg_dt, 0, 0))|
#+--------+----------------------------------------+
#|2013    |[2012-12-31, 2013-01-07]                |
#+--------+----------------------------------------+

Upvotes: 2

Related Questions