Reputation: 137
After some iterations and joins by batch_date, I have a dataframe. Lets focus on r_id0==0 and r_id0==1:
+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date| from| until| desc| hash|
+-----+----------+----------+----------+--------------------+--------------------+
| 0|2020-01-31|2020-01-31|2020-02-27| Esto es un campo 1|b10192333e9c61a40...|
| 0|2020-04-30|2020-04-30|9999-12-31| Esto es un campo 1|b10192333e9c61a40...|
| 1|2020-01-31|2020-01-31|9999-12-31| Esto es un campo 1|b63b3e8201cd417bb...|
| 1|2020-02-28|2020-02-28|9999-12-31| Esto es un campo 1|b63b3e8201cd417bb...|
| 1|2020-03-31|2020-03-31|9999-12-31| Esto es un campo 1|b63b3e8201cd417bb...|
| 2|2020-01-31|2020-01-31|2020-02-27| Esto es un campo 2|b31c138a4b6a96169...|
| 2|2020-02-28|2020-02-28|2020-03-30| Esto es un campo 4|fa1cfe1eaf9b14f88...|
| 2|2020-03-31|2020-03-31|2020-04-29| Esto es un campo 3|e4ffd0cd4a6cf1193...|
| 2|2020-04-30|2020-04-30|9999-12-31| Esto es un campo 23|0fdb24b8fcf8603ee...|
| 3|2020-02-28|2020-02-28|2020-03-30| Esto es un campo 3|a3a6870ca9b42ad06...|
| 3|2020-03-31|2020-03-31|2020-04-29| Esto es un campo 2|18b24f88271e99618...|
| 4|2020-03-31|2020-03-31|2020-04-29| Esto es un campo 5|2fe50db0156cfc909...|
| 6|2020-01-31|2020-01-31|2020-02-27| Esto es un campo 6|7c6d329b73d9de59f...|
| 6|2020-04-30|2020-04-30|9999-12-31|Esto es un campo 77|d70c9340f83167e95...|
+-----+----------+----------+----------+--------------------+--------------------+
I do need to remove rows with same hash column, but just the ones regarding consecutive months in the batch_date column. In that case, I take everything from the first time the duplicated appears, and the value of the last until column. Non consecutive duplicates must remain the same.
Example:
+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date| from| until| desc| hash|
+-----+----------+----------+----------+--------------------+--------------------+
| 0|2020-01-31|2020-01-31|2020-02-27| Esto es un campo 42|b10192333e9c61a40...|
| 0|2020-04-30|2020-04-30|9999-12-31| Esto es un campo 42|b10192333e9c61a40...|
| 1|2020-01-31|2020-01-31|9999-12-31| Esto es un campo 1|b10192333e9c61a40...|
| 2|2020-01-31|2020-01-31|2020-02-27| Esto es un campo 2|b31c138a4b6a96169...|
etc etc...
I did the function performing what i want for r_id0==1, but it will also modify the r_id0==0 case:
def removing_duplicates2(final_df):
'''
Removing duplicates. It takes all from first time they appear and last "until" column.
'''
# we create this list in order to avor_id iteration over the column "hash", which will be our groupby column
iterating_columns= final_df.columns
iterating_columns.remove("hash")
exprs = [F.first(x).alias(x) if x!="until" else F.last(x).alias(x) for x in iterating_columns]
return (
final_df.groupBy("hash").agg(*exprs)
.dropDuplicates(["hash"]).select(columns)
)
This is the result, destroying the r_id0==0:
+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date| from| until| desc| hash|
+-----+----------+----------+----------+--------------------+--------------------+
| 0|2020-01-31|2020-01-31|9999-12-31| Esto es un campo 42|b10192333e9c61a40...|
| 1|2020-01-31|2020-01-31|9999-12-31| Esto es un campo 1|b63b3e8201cd417bb...|
| 2|2020-01-31|2020-01-31|2020-02-27| Esto es un campo 2|b31c138a4b6a96169...|
etc etc
So, I'm quite puzzled. I have to do it with Pyspark and this example will be used for a huge, damn huge table partitioned by batch_date. I strongly believe every loop I create is a step towards servers exploding and firefighters blaming me (I'm using already one (for iterating over batch_date)).
Apologies for the long description, Any advice or suggestion is more than welcome.
Thanks!
Upvotes: 1
Views: 406
Reputation: 14845
The idea is to identify in a first step the groups with only consecutive months using a Window. Then group only the rows found in the first step while leaving all other rows unchanged.
Create some test data:
from datetime import date
data = [[0, date.fromisoformat("2020-01-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("2020-02-27"), "b10192333e9c61a40"],
[0, date.fromisoformat("2020-04-30"), date.fromisoformat("2020-04-30"), date.fromisoformat("9999-12-31"), "b10192333e9c61a40"],
[0, date.fromisoformat("2020-05-01"), date.fromisoformat("2020-04-30"), date.fromisoformat("9999-12-31"), "b10192333e9c61a40"],
[1, date.fromisoformat("2019-12-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
[1, date.fromisoformat("2020-01-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
[1, date.fromisoformat("2020-02-28"), date.fromisoformat("2020-02-28"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
[1, date.fromisoformat("2020-03-31"), date.fromisoformat("2020-03-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
[2, date.fromisoformat("2020-03-31"), date.fromisoformat("2020-03-31"), date.fromisoformat("9999-12-31"), "abcd"]]
df=spark.createDataFrame(data, schema=["r_id0","batch_date","from","until","hash"])
Identify those values of r_id0
that contain only consecutive months:
from pyspark.sql import functions as F
w = Window.partitionBy("r_id0").orderBy("batch_date")
df2=df.withColumn("prev_bd_month", F.month(F.lag("batch_date").over(w)))\
.withColumn("prev_bd_year", F.year(F.lag("batch_date").over(w))) \
.withColumn("adj", F.when((F.year("batch_date").eqNullSafe(F.col("prev_bd_year"))
& (F.month("batch_date") - F.col("prev_bd_month") == 1))
| ((F.year("batch_date") - F.col("prev_bd_year") == 1)
& (F.month("batch_date") == 1) & (F.col("prev_bd_month") == 12) )
| F.col("prev_bd_year").isNull() ,1).otherwise(None)) \
.groupBy("r_id0") \
.agg(F.count("*").alias("count_all"), F.sum("adj").alias("count_adj")) \
.withColumn("all_adj", F.col("count_all") == F.col("count_adj")) \
.drop("count_all", "count_adj") \
.join(df, "r_id0") \
.cache()
Intermediate result:
+-----+-------+----------+----------+----------+-----------------+
|r_id0|all_adj|batch_date| from| until| hash|
+-----+-------+----------+----------+----------+-----------------+
| 0| false|2020-01-31|2020-01-31|2020-02-27|b10192333e9c61a40|
| 0| false|2020-04-30|2020-04-30|9999-12-31|b10192333e9c61a40|
| 0| false|2020-05-01|2020-04-30|9999-12-31|b10192333e9c61a40|
| 1| true|2019-12-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
| 1| true|2020-01-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
| 1| true|2020-02-28|2020-02-28|9999-12-31| 63b3e8201cd417bb|
| 1| true|2020-03-31|2020-03-31|9999-12-31| 63b3e8201cd417bb|
| 2| true|2020-03-31|2020-03-31|9999-12-31| abcd|
+-----+-------+----------+----------+----------+-----------------+
Group rows with all_adj
== true and keep all other rows:
df3=df2.filter("all_adj == true") \
.groupBy("r_id0") \
.agg(F.min("batch_date").alias("batch_date"),
F.expr("min_by(from, batch_date)").alias("from"),
F.max("until").alias("until"),
F.expr("min_by(hash, batch_date)").alias("hash")) \
.union(df2.filter("all_adj == false").drop("all_adj"))
Result:
+-----+----------+----------+----------+-----------------+
|r_id0|batch_date| from| until| hash|
+-----+----------+----------+----------+-----------------+
| 1|2019-12-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
| 2|2020-03-31|2020-03-31|9999-12-31| abcd|
| 0|2020-01-31|2020-01-31|2020-02-27|b10192333e9c61a40|
| 0|2020-04-30|2020-04-30|9999-12-31|b10192333e9c61a40|
| 0|2020-05-01|2020-04-30|9999-12-31|b10192333e9c61a40|
+-----+----------+----------+----------+-----------------+
Upvotes: 1