Reputation: 517
I'm trying to build a generic way to calculate a distance matrix of many sparse vectors (100k vectors with a length of 250k). In my example the data is represented in a scipy csr matrix. This is what I'm doing:
First I'm defining a method to transform the csr rows to pyspark SparseVectors:
def csr_to_sparse_vector(row):
return SparseVector(row.shape[1], sorted(row.indices), row.data)
Now I transform the rows into vectors and save them to a list which I then feed to the SparkContext:
sparse_vectors = [csr_to_sparse_vector(row) for row in refs_sample]
rdd = sc.parallelize(sparse_vectors)
In the next step I use the cartesian function to build all the pairs (similar to this post: Pyspark calculate custom distance between all vectors in a RDD)
In this experiment I want to use tje Jaccard Similarity which is defined accordingly:
def jacc_sim(pair):
dot_product = pair[0].dot(pair[1])
try:
sim = dot_product / (pair[0].numNonzeros() + pair[1].numNonzeros())
except ZeroDivisionError:
return 0.0
return sim
Now I should just map the function and collect the results:
distance_matrix = rdd2.map(lambda x: jacc_sim(x)).collect()
I'm running this code on a small sample with only 100 documents on both, a local machine and a cluster with 180 nodes. The task takes forever and finally crashes: https://pastebin.com/UwLUXvUZ
Any suggestions what might be wrong?
Additionally, if the distance measure is symmetric sim(x,y) == sim(y,x) we just need the upper triangle of the matrix. I found a post that solves this problem by filtering(Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`):
rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])
But this doesn't work for the list of SparseVectors.
Upvotes: 2
Views: 2698
Reputation: 517
The problem was a configuration error that led to split up my data into 1000 partitions. The solution was simply to tell spark explicitly how many partitions he should create (e.g. 10):
rdd = sc.parallelize(sparse_vectors, 10)
Moreover I extended the list of sparse vectors with an enumeration, this way I could then filter out pairs which are not part of the upper triangle matrix:
sparse_vectors = [(i, csr_to_sparse_vector(row)) for i, row in enumerate(authors)]
rdd = sc.parallelize(sparse_vectors, 10)
rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0][0] < x[1][0])
rdd2.map(lambda x: jacc_sim(x)).filter(lambda x: x is not None).saveAsTextFile('hdfs:///user/username/similarities')
The belonging similarity functions looks like this:
def jacc_sim(pair):
id_0 = pair[0][0]
vec_0 = pair[0][1]
id_1 = pair[1][0]
vec_1 = pair[1][1]
dot_product = vec_0.dot(vec_1)
try:
sim = dot_product / (vec_0.numNonzeros() + vec_1.numNonzeros())
if sim > 0:
return (id_0, id_1, sim)
except ZeroDivisionError:
pass
return None
This worked very well for me and I hope someone else will find it useful as well!
Upvotes: 3
Reputation: 422
Is it the list that's problematic, or that SparseVectors comprise the list? One thought is to try converting the SparseVectors to DenseVectors, a suggestion I found here (Convert Sparse Vector to Dense Vector in Pyspark). The calculation result is no different, just how Spark handles it.
Upvotes: 0