Reputation: 141
I have very specific requirement for outlier treatment in Spark Dataframe(Scala) i want to treat just first outlier and make it equal to second group.
Input:
+------+-----------------+------+
|market|responseVariable |blabla|
+------+-----------------+------+
|A |r1 | da |
|A |r1 | ds |
|A |r1 | s |
|A |r1 | f |
|A |r1 | v |
|A |r2 | s |
|A |r2 | s |
|A |r2 | c |
|A |r3 | s |
|A |r3 | s |
|A |r4 | s |
|A |r5 | c |
|A |r6 | s |
|A |r7 | s |
|A |r8 | s |
+------+-----------------+------+
Now per market and responseVariable i want to treat just first outlier..
Group per market and responseVariable:
+------+-----------------+------+
|market|responseVariable |count |
+------+-----------------+------+
|A |r1 | 5 |
|A |r2 | 3 |
|A |r3 | 2 |
|A |r4 | 1 |
|A |r5 | 1 |
|A |r6 | 1 |
|A |r7 | 1 |
|A |r8 | 1 |
+------+-----------------+------+
I want to treat outlier for group market=A and responseVariable=r1 in actual dataset. I want to randomly remove records from group 1 and make it equal to group 2.
Expected output:
+------+-----------------+------+
|market|responseVariable |blabla|
+------+-----------------+------+
|A |r1 | da |
|A |r1 | s |
|A |r1 | v |
|A |r2 | s |
|A |r2 | s |
|A |r2 | c |
|A |r3 | s |
|A |r3 | s |
|A |r4 | s |
|A |r5 | c |
|A |r6 | s |
|A |r7 | s |
|A |r8 | s |
+------+-----------------+------+
group:
+------+-----------------+------+
|market|responseVariable |count |
+------+-----------------+------+
|A |r1 | 3 |
|A |r2 | 3 |
|A |r3 | 2 |
|A |r4 | 1 |
|A |r5 | 1 |
|A |r6 | 1 |
|A |r7 | 1 |
|A |r8 | 1 |
+------+-----------------+------+
I want to repeat this for multiple market.
Upvotes: 0
Views: 255
Reputation: 41957
You will have to know the first and the second groups counts and names which can be done as below
import org.apache.spark.sql.functions._
val first_two_values = df.groupBy("market", "responseVariable").agg(count("blabla").as("count")).orderBy($"count".desc).take((2)).map(row => (row(1) -> row(2))).toList
val rowsToFilter = first_two_values(0)._1
val countsToFilter = first_two_values(1)._2
After you know the first two groups, you need to filter out the extra rows from the first group which can be done by generating row_number
and filtering out the extra rows as below
import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("market","responseVariable").orderBy("blabla")
df.withColumn("rank", row_number().over(windowSpec))
.withColumn("rank", when(col("rank") > countsToFilter && col("responseVariable") === rowsToFilter, false).otherwise(true))
.filter(col("rank"))
.drop("rank")
.show(false)
You should get your requirement fulfilled
Upvotes: 1