Reputation: 21
I would like to calculate the pairwise kendall's tau rank correlation for a large spark dataframe. It's large (say 10m rows with 10k columns) that can't be converted to pandas dataframe and then calculate using pandas.DataFrame.corr.
Also, each column may have null values, thus when calculating the pairwise kendall's tau, the rows with null values in any of the two columns need to be excluded.
I checked the pyspark.mllib.stat.Statistics.corr. It supports "pearson" and "spearman".
df_rdd = df.rdd.map(lambda row: row[0:])
corr_mat = Statistics.corr(df_rdd, method='spearman')
Spearman may be a replacement to my Kendall. However, it doesn't exclude the null values, thus the correlation matrix returned would be impacted by the null values (if one column contains null, then the correlation with that column becomes all null).
Any one has encountered same issue? Breaking the columns into chunks would only get a block-wise correlation matrix. Looping through all pairs is excruciatingly slow instead...
Thank you!!
Upvotes: 2
Views: 2076
Reputation: 1
The last part of the DataBach answer, the assignment to tau
, appears to "mix and match" the Wikipedia formula that is cited in the comment above it. You only need the binomial coefficient (0.5 * n * (n-1))
when looking at the second formula that only uses discordant pair counts. If using the difference between concordant and discordant pairs, you need to divide by the number of pairs instead.
My test cases reveal that you also need to change d["c"]
to d.get("c",0)
and d["d"]
to d.get("d",0)
before starting.
Using a simple test case input vector [1,2,3,4]
for both input variables returns 2.0 with the above formula. If you leave out the factor 0.5
you get the expected 1.0. A second test case with second vector [4,3,2,1]
, the reverse of the first one, gives -1.0 with the correct formula.
Upvotes: 0
Reputation: 1635
Kendalls's rank is not yet supported in Spark. However, if this is not too late for you, I found the following code that you can use to calculate it.
Here an example:
from operator import add
#sample data in lists
variable_1 = [106, 86, 100, 101, 99, 103, 97, 113, 112, 110]
variable_2 = [7, 0, 27, 50, 28, 29, 20, 12, 6, 17]
#zip sample data and convert to rdd
example_data = zip(variable_1, variable_2)
example_rdd = sc.parallelize(example_data)
#filer out all your null values. Row containing nulls will be removed
example_rdd = example_rdd.filter(lambda x: x is not None).filter(lambda x: x != "")
#take the cartesian product of example data (generate all possible combinations)
all_pairs = example_rdd.cartesian(example_rdd)
#function calculating concorant and disconordant pairs
def calc(pair):
p1, p2 = pair
x1, y1 = p1
x2, y2 = p2
if (x1 == x2) and (y1 == y2):
return ("t", 1) #tie
elif ((x1 > x2) and (y1 > y2)) or ((x1 < x2) and (y1 < y2)):
return ("c", 1) #concordant pair
else:
return ("d", 1) #discordant pair
#rank all pairs and calculate concordant / disconrdant pairs with calc() then return results
results = all_pairs.map(calc)
#aggregate the results
results = results.aggregateByKey(0, add, add)
#count and collect
n = example_rdd.count()
d = {k: v for (k, v) in results.collect()}
# http://en.wikipedia.org/wiki/Kendall_tau_rank_correlation_coefficient
tau = (d["c"] - d["d"]) / (0.5 * n * (n-1))
Maybe this helps, or at least for future reference.
Upvotes: 1