Reputation: 915
I have two sets of data (let's call them d1, d2) in Spark. I would like to perform a Two-sample Kolmogorov-Smirnov test, to test wether their underlying poplation distribution function is different. Can MLLib's Statistics.kolmogorovSmirnovTest do this?
The documentation provides this example:
import org.apache.spark.mllib.stat.Statistics
val data: RDD[Double] = ... // an RDD of sample data
// perform a KS test using a cumulative distribution function of our making
val myCDF: Double => Double = ...
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
I tried computing the empirical cumulative distribution function of d2 (collecting it as Map) and comparing it with d1.
Statistics.kolmogorovSmirnovTest(d1, ecdf_map)
The test runs, but the results are wrong.
Am I doing something wrong? Is it possible to do this? Any ideas?
Thank you for the help!
Upvotes: 5
Views: 4479
Reputation: 1349
In Spark Mllib KolmogorovSmirnovTest is one-sampled and two-sided. So if you want specificly two-sampled variant it's not possible within this library. However, you can still compare datasets by calculating empirical cumulative distribution function (I found a library to do that so I'll update this answer if the results will be any good) or using deviations from normal distribution. In this example I'll go with the later.
For the purposes of this testing I generated 3 distributions: 2 triangular that look similar and an exponential one to show big difference in stats.
Note: I couldn't find any scientific papers describing this method as viable for distribution comparison so the idea is mostly empirical.
For every distribution you most definetely could find a mirrored one with the same global maximum distance between its CDF and normal distribution.
Next step was to get KS results against normal distribution with given mean and standart deviation. I visualized them to get a better picture:
As you can see, results (KS statistics and p-value) for triangual distributions are close to each other while exponential one is way off. As I stated in the note you could easily fool this method by mirroring dataset but for a real world data it could be ok.
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.stat.Statistics
import org.apache.commons.math3.distribution.{ ExponentialDistribution, TriangularDistribution }
import breeze.plot._
import breeze.linalg._
import breeze.numerics._
object Main {
def main( args: Array[ String ] ): Unit = {
val conf =
new SparkConf()
.setAppName( "SO Spark" )
.setMaster( "local[*]" )
.set( "spark.driver.host", "localhost" )
val sc = new SparkContext( conf )
// Create similar distributions
val triDist1 = new TriangularDistribution( -3, 5, 7 )
val triDist2 = new TriangularDistribution( -3, 7, 7 )
// Exponential distribution to show big difference
val expDist1 = new ExponentialDistribution( 0.6 )
// Sample data from the distributions and parallelize it
val n = 100000
val sampledTriDist1 = sc.parallelize( triDist1.sample( n ) )
val sampledTriDist2 = sc.parallelize( triDist2.sample( n ) )
val sampledExpDist1 = sc.parallelize( expDist1.sample( n ) )
// KS tests
val resultTriDist1 = Statistics
.kolmogorovSmirnovTest( sampledTriDist1,
"norm",
sampledTriDist1.mean,
sampledTriDist1.stdev )
val resultTriDist2 = Statistics
.kolmogorovSmirnovTest( sampledTriDist2,
"norm",
sampledTriDist2.mean,
sampledTriDist2.stdev )
val resultExpDist1 = Statistics
.kolmogorovSmirnovTest( sampledExpDist1,
"norm",
sampledExpDist1.mean,
sampledExpDist1.stdev )
// Results
val statsTriDist1 =
"Tri1: ( " +
resultTriDist1.statistic +
", " +
resultTriDist1.pValue +
" )"
val statsTriDist2 =
"Tri2: ( " +
resultTriDist2.statistic +
", " +
resultTriDist2.pValue +
" )"
val statsExpDist1 =
"Exp1: ( " +
resultExpDist1.statistic +
", " +
resultExpDist1.pValue +
" )"
println( statsTriDist1 )
println( statsTriDist2 )
println( statsExpDist1 )
// Visualize
val graphCanvas = Figure()
val mainPlot =
graphCanvas
.subplot( 0 )
mainPlot.legend = true
val x = linspace( 1, n, n )
mainPlot += plot( x,
sampledTriDist1.sortBy( x => x ).take( n ),
name = statsTriDist1 )
mainPlot += plot( x,
sampledTriDist2.sortBy( x => x ).take( n ),
name = statsTriDist2 )
mainPlot += plot( x,
sampledExpDist1.sortBy( x => x ).take( n ),
name = statsExpDist1 )
mainPlot.xlabel = "x"
mainPlot.ylabel = "sorted sample"
mainPlot.title = "KS results for 2 Triangular and 1 Exponential Distributions"
graphCanvas.saveas( "ks-sample.png", 300 )
sc.stop()
}
}
Upvotes: 7