Nishikant Jain
Nishikant Jain

Reputation: 141

very specific requirement for outlier treatment in Spark Dataframe

I have very specific requirement for outlier treatment in Spark Dataframe(Scala) i want to treat just first outlier and make it equal to second group.

Input:

+------+-----------------+------+
|market|responseVariable |blabla|
+------+-----------------+------+
|A     |r1               |  da  |   
|A     |r1               |  ds  |
|A     |r1               |  s   |
|A     |r1               |  f   |
|A     |r1               |  v   |
|A     |r2               |  s   |
|A     |r2               |  s   |
|A     |r2               |  c   |
|A     |r3               |  s   |
|A     |r3               |  s   |
|A     |r4               |  s   |
|A     |r5               |  c   |
|A     |r6               |  s   |
|A     |r7               |  s   |
|A     |r8               |  s   |
+------+-----------------+------+

Now per market and responseVariable i want to treat just first outlier..

Group per market and responseVariable:

+------+-----------------+------+
|market|responseVariable |count |
+------+-----------------+------+
|A     |r1               |  5   |   
|A     |r2               |  3   |
|A     |r3               |  2   |
|A     |r4               |  1   |
|A     |r5               |  1   |
|A     |r6               |  1   |
|A     |r7               |  1   |
|A     |r8               |  1   |
+------+-----------------+------+

I want to treat outlier for group market=A and responseVariable=r1 in actual dataset. I want to randomly remove records from group 1 and make it equal to group 2.

Expected output:

+------+-----------------+------+
|market|responseVariable |blabla|
+------+-----------------+------+
|A     |r1               |  da  |   
|A     |r1               |  s   |
|A     |r1               |  v   |
|A     |r2               |  s   |
|A     |r2               |  s   |
|A     |r2               |  c   |
|A     |r3               |  s   |
|A     |r3               |  s   |
|A     |r4               |  s   |
|A     |r5               |  c   |
|A     |r6               |  s   |
|A     |r7               |  s   |
|A     |r8               |  s   |
+------+-----------------+------+

group:

+------+-----------------+------+
|market|responseVariable |count |
+------+-----------------+------+
|A     |r1               |  3   |   
|A     |r2               |  3   |
|A     |r3               |  2   |
|A     |r4               |  1   |
|A     |r5               |  1   |
|A     |r6               |  1   |
|A     |r7               |  1   |
|A     |r8               |  1   |
+------+-----------------+------+

I want to repeat this for multiple market. enter image description here

Upvotes: 0

Views: 255

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You will have to know the first and the second groups counts and names which can be done as below

import org.apache.spark.sql.functions._
val first_two_values = df.groupBy("market", "responseVariable").agg(count("blabla").as("count")).orderBy($"count".desc).take((2)).map(row => (row(1) -> row(2))).toList
val rowsToFilter = first_two_values(0)._1
val countsToFilter = first_two_values(1)._2

After you know the first two groups, you need to filter out the extra rows from the first group which can be done by generating row_number and filtering out the extra rows as below

import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("market","responseVariable").orderBy("blabla")

df.withColumn("rank", row_number().over(windowSpec))
  .withColumn("rank", when(col("rank") > countsToFilter && col("responseVariable") === rowsToFilter, false).otherwise(true))
  .filter(col("rank"))
  .drop("rank")
  .show(false)

You should get your requirement fulfilled

Upvotes: 1

Related Questions