DxG
DxG

Reputation: 209

Spark drop duplicates and select row with max value

I'm trying to drop duplicates based on column1 and select the row with max value in column2. The column2 has "year"(2019,2020 etc) as values and it is of type "String". The solution I have is, converting the column 2 into integer and selecting the max value.

Dataset<Row> ds ; //The dataset with column1,column2(year), column3 etc.
Dataset<Row> newDs = ds.withColumn("column2Int", col("column2").cast(DataTypes.IntegerType));
newDs  = newDs.groupBy("column1").max("column2Int"); // drops all other columns

This approach drops all other columns in the original dataset 'ds' when I do a "group by". So I have to do a join between 'ds' and 'newDS' to get back all the original columns. Also casting the String column to Integer looks like an ineffective workaround.

Is it possible to drop the duplicates and get the row with bigger string value from the original dataset itself ?

Upvotes: 0

Views: 1585

Answers (1)

Chitral Verma
Chitral Verma

Reputation: 2855

This is a classic de-duplication problem and you'll need to use Window + Rank + filter combo for this.

I'm not very familiar with the Java syntax, but the sample code should look like something below,

    import org.apache.spark.sql.expressions.Window;
    import org.apache.spark.sql.expressions.WindowSpec;
    import org.apache.spark.sql.functions;
    import org.apache.spark.sql.types.DataTypes;
    
    Dataset<Row> df = ???;
    
    WindowSpec windowSpec = Window.partitionBy("column1").orderBy(functions.desc("column2Int"));
    
    Dataset<Row> result =
        df.withColumn("column2Int", functions.col("column2").cast(DataTypes.IntegerType))
            .withColumn("rank", functions.rank().over(windowSpec))
            .where("rank == 1")
            .drop("rank");

    result.show(false);

Overview of what happened,

  1. Add the casted integer column to the df for future sorting.
  2. Subsections/ windows were formed in your dataset (partitions) based on the value of column1
  3. For each of these subsections/ windows/ partitions the rows were sorted on column casted to int. Desc order as you want max.
  4. Ranks like row numbers are assigned to the rows in each partition/ window created.
  5. Filtering is done for all row where rank is 1 (max value as the ordering was desc.)

Upvotes: 1

Related Questions