user3802925
user3802925

Reputation: 81

SPARK DataFrame: Remove MAX value in a group

My data is like:

id | val
---------------- 
a1 |  10
a1 |  20
a2 |  5
a2 |  7
a2 |  2

I am trying to delete row that has MAX(val) in the group if I group on "id".

Result should be like:

id | val
---------------- 
a1 |  10
a2 |  5
a2 |  2

I am using SPARK DataFrame and SQLContext. I need some way like:

DataFrame df = sqlContext.sql("SELECT * FROM jsontable WHERE (id, val) NOT IN (SELECT is,MAX(val) from jsontable GROUP BY id)");

How can I do that?

Upvotes: 7

Views: 2210

Answers (3)

user3802925
user3802925

Reputation: 81

Below is the Java implementation of Mario's scala code:

DataFrame df = sqlContext.read().json(input);
DataFrame dfMaxRaw = df.groupBy("id").max("val");
DataFrame dfMax = dfMaxRaw.select(
    dfMaxRaw.col("id").as("max_id"), dfMaxRaw.col("max(val)").as("max_val")
);
DataFrame combineMaxWithData = df.join(dfMax, df.col("id")
    .equalTo(dfMax.col("max_id")));
DataFrame finalResult = combineMaxWithData.filter(
    combineMaxWithData.col("id").equalTo(combineMaxWithData.col("max_id"))
        .and(combineMaxWithData.col("val").notEqual(combineMaxWithData.col("max_val"))) 
);

Upvotes: 1

Daniel de Paula
Daniel de Paula

Reputation: 17872

You can do that using dataframe operations and Window functions. Assuming you have your data in the dataframe df1:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val maxOnWindow = max(col("val")).over(Window.partitionBy(col("id")))
val df2 = df1
  .withColumn("max", maxOnWindow)
  .where(col("val") < col("max"))
  .select("id", "val")

In Java, the equivalent would be something like:

import org.apache.spark.sql.functions.Window;
import static org.apache.spark.sql.functions.*;

Column maxOnWindow = max(col("val")).over(Window.partitionBy("id"));
DataFrame df2 = df1
    .withColumn("max", maxOnWindow)
    .where(col("val").lt(col("max")))
    .select("id", "val");

Here's a nice article about window functions: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Upvotes: 3

marios
marios

Reputation: 8996

Here is how to do this using RDD and a more Scala-flavored approach:

// Let's first get the data in key-value pair format
val data = sc.makeRDD( Seq( ("a",20), ("a", 1), ("a",8), ("b",3), ("b",10), ("b",9) ) )

// Next let's find the max value from each group
val maxGroups = data.reduceByKey( Math.max(_,_) )

// We join the max in the group with the original data
val combineMaxWithData = maxGroups.join(data)

// Finally we filter out the values that agree with the max
val finalResults = combineMaxWithData.filter{ case (gid, (max,curVal)) => max != curVal }.map{ case (gid, (max,curVal)) => (gid,curVal) }


println( finalResults.collect.toList )
>List((a,1), (a,8), (b,3), (b,9))

Upvotes: 0

Related Questions