anup
anup

Reputation: 11

identifying decrease in values in spark (outliers)

I have a large data set with millions of records which is something like

Movie Likes Comments Shares Views 
 A     100     10      20     30 
 A     102     11      22     35 
 A     104     12      25     45 
 A     *103*   13     *24*    50 
 B     200     10      20     30 
 B     205    *9*      21     35 
 B     *203*   12      29     42 
 B     210     13     *23*   *39*

Likes, comments etc are rolling totals and they are suppose to increase. If there is drop in any of this for a movie then its a bad data needs to be identified.

I have initial thoughts about groupby movie and then sort within the group. I am using dataframes in spark 1.6 for processing and it does not seem to be achievable as there is no sorting within the grouped data in dataframe.

Buidling something for outlier detection can be another approach but because of time constraint I have not explored it yet.

Is there anyway I can achieve this ?

Thanks !!

Upvotes: 0

Views: 285

Answers (1)

ImDarrenG
ImDarrenG

Reputation: 2345

You can use the lag window function to bring the previous values into scope:

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy('Movie).orderBy('maybesometemporalfield)
dataset.withColumn("lag_likes", lag('Likes, 1) over windowSpec)
       .withColumn("lag_comments", lag('Comments, 1) over windowSpec)
       .show

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-functions.html#lag

Another approach would be to assign a row number (if there isn't one already), lag that column, then join the row to it's previous row, to allow you to do the comparison.

HTH

Upvotes: 1

Related Questions