albertovpd
albertovpd

Reputation: 137

pyspark remove just consecutive duplicated rows

After some iterations and joins by batch_date, I have a dataframe. Lets focus on r_id0==0 and r_id0==1:

+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 1|b10192333e9c61a40...|
|    0|2020-04-30|2020-04-30|9999-12-31|  Esto es un campo 1|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    1|2020-02-28|2020-02-28|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    1|2020-03-31|2020-03-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
|    2|2020-02-28|2020-02-28|2020-03-30|  Esto es un campo 4|fa1cfe1eaf9b14f88...|
|    2|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 3|e4ffd0cd4a6cf1193...|
|    2|2020-04-30|2020-04-30|9999-12-31| Esto es un campo 23|0fdb24b8fcf8603ee...|
|    3|2020-02-28|2020-02-28|2020-03-30|  Esto es un campo 3|a3a6870ca9b42ad06...|
|    3|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 2|18b24f88271e99618...|
|    4|2020-03-31|2020-03-31|2020-04-29|  Esto es un campo 5|2fe50db0156cfc909...|
|    6|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 6|7c6d329b73d9de59f...|
|    6|2020-04-30|2020-04-30|9999-12-31|Esto es un campo 77|d70c9340f83167e95...|
+-----+----------+----------+----------+--------------------+--------------------+

I do need to remove rows with same hash column, but just the ones regarding consecutive months in the batch_date column. In that case, I take everything from the first time the duplicated appears, and the value of the last until column. Non consecutive duplicates must remain the same.

Example:

+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 42|b10192333e9c61a40...|
|    0|2020-04-30|2020-04-30|9999-12-31|  Esto es un campo 42|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b10192333e9c61a40...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
etc etc... 

I did the function performing what i want for r_id0==1, but it will also modify the r_id0==0 case:

def removing_duplicates2(final_df):
    '''
    Removing duplicates. It takes all from first time they appear and last "until" column.
    '''
    # we create this list in order to avor_id iteration over the column "hash", which will be our groupby column
    iterating_columns= final_df.columns
    iterating_columns.remove("hash")
    exprs =  [F.first(x).alias(x) if x!="until" else F.last(x).alias(x)  for x in iterating_columns] 
    return  (
         final_df.groupBy("hash").agg(*exprs)
            .dropDuplicates(["hash"]).select(columns)
         )

This is the result, destroying the r_id0==0:

+-----+----------+----------+----------+--------------------+--------------------+
|r_id0|batch_date|      from|     until|                desc|                hash|
+-----+----------+----------+----------+--------------------+--------------------+
|    0|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 42|b10192333e9c61a40...|
|    1|2020-01-31|2020-01-31|9999-12-31|  Esto es un campo 1|b63b3e8201cd417bb...|
|    2|2020-01-31|2020-01-31|2020-02-27|  Esto es un campo 2|b31c138a4b6a96169...|
etc etc

So, I'm quite puzzled. I have to do it with Pyspark and this example will be used for a huge, damn huge table partitioned by batch_date. I strongly believe every loop I create is a step towards servers exploding and firefighters blaming me (I'm using already one (for iterating over batch_date)).

Apologies for the long description, Any advice or suggestion is more than welcome.

Thanks!

Upvotes: 1

Views: 406

Answers (1)

werner
werner

Reputation: 14845

The idea is to identify in a first step the groups with only consecutive months using a Window. Then group only the rows found in the first step while leaving all other rows unchanged.

Create some test data:

from datetime import date
data = [[0, date.fromisoformat("2020-01-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("2020-02-27"), "b10192333e9c61a40"],
    [0, date.fromisoformat("2020-04-30"), date.fromisoformat("2020-04-30"), date.fromisoformat("9999-12-31"), "b10192333e9c61a40"],
    [0, date.fromisoformat("2020-05-01"), date.fromisoformat("2020-04-30"), date.fromisoformat("9999-12-31"), "b10192333e9c61a40"],
    [1, date.fromisoformat("2019-12-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-01-31"), date.fromisoformat("2020-01-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-02-28"), date.fromisoformat("2020-02-28"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [1, date.fromisoformat("2020-03-31"), date.fromisoformat("2020-03-31"), date.fromisoformat("9999-12-31"), "63b3e8201cd417bb"],
    [2, date.fromisoformat("2020-03-31"), date.fromisoformat("2020-03-31"), date.fromisoformat("9999-12-31"), "abcd"]]

df=spark.createDataFrame(data, schema=["r_id0","batch_date","from","until","hash"])

Identify those values of r_id0 that contain only consecutive months:

from pyspark.sql import functions as F

w = Window.partitionBy("r_id0").orderBy("batch_date")
df2=df.withColumn("prev_bd_month", F.month(F.lag("batch_date").over(w)))\
    .withColumn("prev_bd_year", F.year(F.lag("batch_date").over(w))) \
    .withColumn("adj", F.when((F.year("batch_date").eqNullSafe(F.col("prev_bd_year")) 
                              & (F.month("batch_date") - F.col("prev_bd_month") == 1)) 
                              | ((F.year("batch_date") - F.col("prev_bd_year") == 1) 
                              & (F.month("batch_date") == 1) & (F.col("prev_bd_month") == 12) )
                              | F.col("prev_bd_year").isNull() ,1).otherwise(None)) \
    .groupBy("r_id0") \
    .agg(F.count("*").alias("count_all"), F.sum("adj").alias("count_adj")) \
    .withColumn("all_adj", F.col("count_all") == F.col("count_adj")) \
    .drop("count_all", "count_adj") \
    .join(df, "r_id0") \
    .cache()

Intermediate result:

+-----+-------+----------+----------+----------+-----------------+
|r_id0|all_adj|batch_date|      from|     until|             hash|
+-----+-------+----------+----------+----------+-----------------+
|    0|  false|2020-01-31|2020-01-31|2020-02-27|b10192333e9c61a40|
|    0|  false|2020-04-30|2020-04-30|9999-12-31|b10192333e9c61a40|
|    0|  false|2020-05-01|2020-04-30|9999-12-31|b10192333e9c61a40|
|    1|   true|2019-12-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-01-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-02-28|2020-02-28|9999-12-31| 63b3e8201cd417bb|
|    1|   true|2020-03-31|2020-03-31|9999-12-31| 63b3e8201cd417bb|
|    2|   true|2020-03-31|2020-03-31|9999-12-31|             abcd|
+-----+-------+----------+----------+----------+-----------------+

Group rows with all_adj == true and keep all other rows:

df3=df2.filter("all_adj == true") \
    .groupBy("r_id0") \
    .agg(F.min("batch_date").alias("batch_date"), 
         F.expr("min_by(from, batch_date)").alias("from"),
         F.max("until").alias("until"),
         F.expr("min_by(hash, batch_date)").alias("hash")) \
    .union(df2.filter("all_adj == false").drop("all_adj"))

Result:

+-----+----------+----------+----------+-----------------+
|r_id0|batch_date|      from|     until|             hash|
+-----+----------+----------+----------+-----------------+
|    1|2019-12-31|2020-01-31|9999-12-31| 63b3e8201cd417bb|
|    2|2020-03-31|2020-03-31|9999-12-31|             abcd|
|    0|2020-01-31|2020-01-31|2020-02-27|b10192333e9c61a40|
|    0|2020-04-30|2020-04-30|9999-12-31|b10192333e9c61a40|
|    0|2020-05-01|2020-04-30|9999-12-31|b10192333e9c61a40|
+-----+----------+----------+----------+-----------------+

Upvotes: 1

Related Questions