thentangler
thentangler

Reputation: 1256

Aggregating Kolmogrov Smirnov Test in pyspark

Is there a way to apply the KS test from the spark.mllib library in pyspark using the groupBy clause or some method of aggregation? For example, I have a dataframe df with columns ID and RESULT like so:

+-------+------+
|     ID|RESULT|
+-------+------+
|3648296|  2.73|
|3648296|  9.64|
|3648189|  0.03|
|3648189|  0.03|
|3648296|  2.51|
|3648189|  0.01|
|3648296|  1.75|
|3648296| 30.23|
|3648189|  0.02|
|3648189|  0.02|
|3648189|  0.02|
|3648296|  3.28|
|3648296| 32.55|
|3648296|  2.32|
|3648296| 34.58|
|3648296| 29.22|
|3648189|  0.02|
|3648296|  1.36|
|3648296|  1.64|
|3648296|  1.17|
+-------+------+

There are 2 IDs 3648296 and 3648189 and each of their corresponding RESULT values are in the order of a few hundred thousand. Is it possible to apply a groupBy function like so:

from pyspark.mllib.stat import Statistics

normtest=df.groupBy('ID').Statistics.kolmogorovSmirnovTest(df.RESULT, "norm", 0, 1)

such that I get an output dataframe like:

+-------+---------+----------+
|     ID|p-value  |statistic |
+-------+---------+----------+
|3648296|some val | some val |
|3648189|some val | some val |
+-------+---------+----------+

is this possible?

Upvotes: 2

Views: 889

Answers (1)

Liran Funaro
Liran Funaro

Reputation: 2878

This can be solved by binning the data, then performing Kolmogorov-Smirnov Test on the binned data (i.e., histogram). It won't produce the largest distance, but if your effective distribution is smooth, then the result should be close enough.

By bucketing the results, you ensure that only a limited number of items (the number of buckets) will be loaded into memory at a time.

First, we need to implement a histogram version of the kstest:

import numpy as np

def hist_kstest(hist: np.array, bin_edges: np.array, cdf):
    i = hist.cumsum()
    n = i[-1]

    bin_right_edges = bin_edges[1:]
    cdf_vals = cdf(bin_right_edges)
    
    statistic = np.max([
        cdf_vals - (i-1) / n,
        i / n - cdf_vals
    ])
    pvalue = stats.distributions.kstwo.sf(statistic, n)
    return statistic, pvalue

Then use it as follows:

from pyspark.sql import functions as F, types as T
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
from scipy import stats

# Choose the number of buckets. It depends on your memory
# availability and affects the accuracy of the test.
num_buckets = 1_000

# Choose the null hypothesis (H0)
h0_cdf = stats.norm(0, 1).cdf

# Bucket the result and get the buckets' edges
bucketizer = QuantileDiscretizer(
    numBuckets=num_buckets, inputCol='RESULT', outputCol='result_bucket'
).setHandleInvalid("keep").fit(df)
buckets = np.array(bucketizer.getSplits())

def kstest(key, pdf: pd.DataFrame):
    pdf.sort_values('result_bucket', inplace=True)
    hist = pdf['count'].to_numpy()
    # Some of the buckets might not appear in all the groups, so
    # we filter buckets that are not available.
    bin_edges = buckets[[0, *(pdf['result_bucket'].to_numpy() + 1)]]
    statistic, pvalue = hist_kstest(hist, bin_edges, h0_cdf)
    return pd.DataFrame([[*key, statistic, pvalue]])

df = bucketizer.transform(df).groupBy("ID", "result_bucket").agg(
    F.count("*").alias("count")
).groupby("ID").applyInPandas(kstest, "ID long, statistic double, pvalue double")

Upvotes: 2

Related Questions