Reputation: 69
I have two dataframes, each one with a date column. ie:
+-----------+
| DEADLINES|
+-----------+
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
+-----------+
+----------+
| DT_DATE|
+----------+
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02|
+----------+
I need to count how many dates of DT_DATE
are between a given reference date and each one of DEADLINES
dates.
For example: using 2021-03-31
as reference date should give the following result set.
+-----------+------------+
| DEADLINES| dt_count|
+-----------+------------+
| 2023-07-15| 7|
| 2018-08-10| 0|
| 2022-03-28| 7|
| 2021-06-22| 4|
| 2021-12-18| 7|
| 2021-10-11| 5|
| 2021-11-13| 7|
+-----------+------------+
I managed to make it work iterating through each row of deadlines dataframe but with a larger dataset the performance got very poor.
Does anyone have a better solution?
Edit: thats my current solution:
def count_days(deadlines_df, dates_df, ref_date):
for row in deadlines_df.collect():
qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
yield row.DEADLINES, qtt
new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"])
Upvotes: 3
Views: 129
Reputation: 5068
If you have a small number of deadline dates, you can:
dates_df
dataframe, with value is 1
when DT_DATE
is between ref_date
and deadline date and 0
otherwiseLet's see step by step
from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows:
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
.otherwise(
F.lit(0)
)
)
And you get, with your example, the following dates_with_deadlines
dataframe:
+----------+----------+----------+----------+----------+----------+----------+----------+
|DT_DATE |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
+----------+----------+----------+----------+----------+----------+----------+----------+
|2021-04-02|1 |0 |1 |1 |1 |1 |1 |
|2021-04-21|1 |0 |1 |1 |1 |1 |1 |
|2021-05-01|1 |0 |1 |1 |1 |1 |1 |
|2021-06-03|1 |0 |1 |1 |1 |1 |1 |
|2021-09-07|1 |0 |1 |0 |1 |1 |1 |
|2021-10-12|1 |0 |1 |0 |1 |0 |1 |
|2021-11-02|1 |0 |1 |0 |1 |0 |1 |
+----------+----------+----------+----------+----------+----------+----------+----------+
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])
After this step, you get the following aggregated_df
dataframe:
+----------+----------+----------+----------+----------+----------+----------+
|2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
+----------+----------+----------+----------+----------+----------+----------+
|7 |0 |7 |4 |7 |5 |7 |
+----------+----------+----------+----------+----------+----------+----------+
result_df = aggregated_df \
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
.drop(*[str(x.DEADLINES) for x in deadline_rows]) \
.withColumn('data', F.explode('merged')) \
.drop('merged') \
.withColumn('DEADLINES', F.col('data.DEADLINES')) \
.withColumn('dt_count', F.col('data.dt_count')) \
.drop('data')
And you have your expected result_df
dataframe:
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2023-07-15|7 |
|2018-08-10|0 |
|2022-03-28|7 |
|2021-06-22|4 |
|2021-12-18|7 |
|2021-10-11|5 |
|2021-11-13|7 |
+----------+--------+
from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows:
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
.otherwise(
F.lit(0)
)
)
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])
result_df = aggregated_df \
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
.drop(*[str(x.DEADLINES) for x in deadline_rows]) \
.withColumn('data', F.explode('merged')) \
.drop('merged') \
.withColumn('DEADLINES', F.col('data.DEADLINES')) \
.withColumn('dt_count', F.col('data.dt_count')) \
.drop('data')
With this solution, the only step that cannot be done using a distributed system is the transpose step.
Moreover, instead of your current solution, we perform all aggregation for each deadline column in parallele, and not sequentially.
However, this solutions works only if there are few deadline dates (hundreds, maybe thousands deadline dates), first because we retrieve all those deadline dates in the Spark driver with .collect()
, second because in first step we create one column per deadline date, creating rows with lot of data, and finally because the last step is also executed on only one executor.
Upvotes: 0
Reputation: 7207
Both dataframes can be united with different weight, and Window function with range from start to current row used (Scala):
val deadlines = Seq(
("2023-07-15"),
("2018-08-10"),
("2022-03-28"),
("2021-06-22"),
("2021-12-18"),
("2021-10-11"),
("2021-11-13")
).toDF("DEADLINES")
val dates = Seq(
("2021-04-02"),
("2021-04-21"),
("2021-05-01"),
("2021-06-03"),
("2021-09-07"),
("2021-10-12"),
("2021-11-02")
).toDF("DT_DATE")
val referenceDate = "2021-03-31"
val united = deadlines.withColumn("weight", lit(0))
.unionAll(
dates
.where($"DT_DATE" >= referenceDate)
.withColumn("weight", lit(1))
)
val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)
val result = united
.withColumn("dt_count", sum("weight").over(fromStartToCurrentRowWindow))
.where($"weight" === lit(0))
.drop("weight")
Output:
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2018-08-10|0 |
|2021-06-22|4 |
|2021-10-11|5 |
|2021-11-13|7 |
|2021-12-18|7 |
|2022-03-28|7 |
|2023-07-15|7 |
+----------+--------+
Note: calculation will be executed in one partition, Spark shows such warning: WARN Logging - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Also other solution possible, joining two dataframes by range, which leads to cartesian join.
Upvotes: 2