Mara
Mara

Reputation: 915

Apache Spark - Two-sample Kolmogorov-Smirnov Test

I have two sets of data (let's call them d1, d2) in Spark. I would like to perform a Two-sample Kolmogorov-Smirnov test, to test wether their underlying poplation distribution function is different. Can MLLib's Statistics.kolmogorovSmirnovTest do this?

The documentation provides this example:

import org.apache.spark.mllib.stat.Statistics

val data: RDD[Double] = ... // an RDD of sample data

// perform a KS test using a cumulative distribution function of our making
val myCDF: Double => Double = ...
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)

I tried computing the empirical cumulative distribution function of d2 (collecting it as Map) and comparing it with d1.

Statistics.kolmogorovSmirnovTest(d1, ecdf_map)

The test runs, but the results are wrong.

Am I doing something wrong? Is it possible to do this? Any ideas?

Thank you for the help!

Upvotes: 5

Views: 4479

Answers (1)

Denis Iakunchikov
Denis Iakunchikov

Reputation: 1349

In Spark Mllib KolmogorovSmirnovTest is one-sampled and two-sided. So if you want specificly two-sampled variant it's not possible within this library. However, you can still compare datasets by calculating empirical cumulative distribution function (I found a library to do that so I'll update this answer if the results will be any good) or using deviations from normal distribution. In this example I'll go with the later.

Comparing datasets by KST statistics against normal distribution

For the purposes of this testing I generated 3 distributions: 2 triangular that look similar and an exponential one to show big difference in stats.

Note: I couldn't find any scientific papers describing this method as viable for distribution comparison so the idea is mostly empirical.

For every distribution you most definetely could find a mirrored one with the same global maximum distance between its CDF and normal distribution.

Next step was to get KS results against normal distribution with given mean and standart deviation. I visualized them to get a better picture:

breeze visualisation for distributions and their KS test results

As you can see, results (KS statistics and p-value) for triangual distributions are close to each other while exponential one is way off. As I stated in the note you could easily fool this method by mirroring dataset but for a real world data it could be ok.

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.stat.Statistics

import org.apache.commons.math3.distribution.{ ExponentialDistribution, TriangularDistribution }

import breeze.plot._
import breeze.linalg._
import breeze.numerics._

object Main {

    def main( args: Array[ String ] ): Unit = {

        val conf = 
            new SparkConf()
            .setAppName( "SO Spark" )
            .setMaster( "local[*]" )
            .set( "spark.driver.host", "localhost" )

        val sc = new SparkContext( conf )

        // Create similar distributions
        val triDist1 = new TriangularDistribution( -3, 5, 7 )
        val triDist2 = new TriangularDistribution( -3, 7, 7 )

        // Exponential distribution to show big difference
        val expDist1 = new ExponentialDistribution( 0.6 )

        // Sample data from the distributions and parallelize it
        val n = 100000
        val sampledTriDist1 = sc.parallelize( triDist1.sample( n ) )
        val sampledTriDist2 = sc.parallelize( triDist2.sample( n ) )
        val sampledExpDist1 = sc.parallelize( expDist1.sample( n ) )

        // KS tests
        val resultTriDist1 = Statistics
            .kolmogorovSmirnovTest( sampledTriDist1, 
                                    "norm", 
                                    sampledTriDist1.mean, 
                                    sampledTriDist1.stdev )

        val resultTriDist2 = Statistics
            .kolmogorovSmirnovTest( sampledTriDist2, 
                                    "norm", 
                                    sampledTriDist2.mean, 
                                    sampledTriDist2.stdev )

        val resultExpDist1 = Statistics
            .kolmogorovSmirnovTest( sampledExpDist1, 
                                    "norm", 
                                    sampledExpDist1.mean, 
                                    sampledExpDist1.stdev )

        // Results
        val statsTriDist1 = 
            "Tri1: ( " + 
            resultTriDist1.statistic + 
            ", " + 
            resultTriDist1.pValue + 
            " )"

        val statsTriDist2 = 
            "Tri2: ( " + 
            resultTriDist2.statistic + 
            ", " + 
            resultTriDist2.pValue + 
            " )"

        val statsExpDist1 = 
            "Exp1: ( " + 
            resultExpDist1.statistic + 
            ", " + 
            resultExpDist1.pValue + 
            " )"  

        println( statsTriDist1 )
        println( statsTriDist2 )
        println( statsExpDist1 )

        // Visualize
        val graphCanvas = Figure()

        val mainPlot = 
            graphCanvas
            .subplot( 0 )

        mainPlot.legend = true

        val x = linspace( 1, n, n )      

        mainPlot += plot( x, 
                          sampledTriDist1.sortBy( x => x ).take( n ), 
                          name = statsTriDist1 )

        mainPlot += plot( x, 
                          sampledTriDist2.sortBy( x => x ).take( n ), 
                          name = statsTriDist2 )

        mainPlot += plot( x, 
                          sampledExpDist1.sortBy( x => x ).take( n ), 
                          name = statsExpDist1 )

        mainPlot.xlabel = "x"
        mainPlot.ylabel = "sorted sample"

        mainPlot.title = "KS results for 2 Triangular and 1 Exponential Distributions"

        graphCanvas.saveas( "ks-sample.png", 300 )

        sc.stop()
    }
}

Upvotes: 7

Related Questions