nadre
nadre

Reputation: 517

pyspark calculate distance matrix of sparse vectors

I'm trying to build a generic way to calculate a distance matrix of many sparse vectors (100k vectors with a length of 250k). In my example the data is represented in a scipy csr matrix. This is what I'm doing:

First I'm defining a method to transform the csr rows to pyspark SparseVectors:

def csr_to_sparse_vector(row):
    return SparseVector(row.shape[1], sorted(row.indices), row.data)

Now I transform the rows into vectors and save them to a list which I then feed to the SparkContext:

sparse_vectors = [csr_to_sparse_vector(row) for row in refs_sample]
rdd = sc.parallelize(sparse_vectors)

In the next step I use the cartesian function to build all the pairs (similar to this post: Pyspark calculate custom distance between all vectors in a RDD)

In this experiment I want to use tje Jaccard Similarity which is defined accordingly:

def jacc_sim(pair):
    dot_product = pair[0].dot(pair[1])
    try:
        sim = dot_product / (pair[0].numNonzeros() + pair[1].numNonzeros())
    except ZeroDivisionError:
        return 0.0
    return sim

Now I should just map the function and collect the results:

distance_matrix = rdd2.map(lambda x: jacc_sim(x)).collect()

I'm running this code on a small sample with only 100 documents on both, a local machine and a cluster with 180 nodes. The task takes forever and finally crashes: https://pastebin.com/UwLUXvUZ

Any suggestions what might be wrong?

Additionally, if the distance measure is symmetric sim(x,y) == sim(y,x) we just need the upper triangle of the matrix. I found a post that solves this problem by filtering(Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`):

rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])

But this doesn't work for the list of SparseVectors.

Upvotes: 2

Views: 2698

Answers (2)

nadre
nadre

Reputation: 517

The problem was a configuration error that led to split up my data into 1000 partitions. The solution was simply to tell spark explicitly how many partitions he should create (e.g. 10):

rdd = sc.parallelize(sparse_vectors, 10)

Moreover I extended the list of sparse vectors with an enumeration, this way I could then filter out pairs which are not part of the upper triangle matrix:

sparse_vectors = [(i, csr_to_sparse_vector(row)) for i, row in enumerate(authors)]
rdd = sc.parallelize(sparse_vectors, 10)
rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0][0] < x[1][0])
rdd2.map(lambda x: jacc_sim(x)).filter(lambda x: x is not None).saveAsTextFile('hdfs:///user/username/similarities')

The belonging similarity functions looks like this:

def jacc_sim(pair):
    id_0 = pair[0][0]
    vec_0 = pair[0][1]
    id_1 = pair[1][0]
    vec_1 = pair[1][1]
    dot_product = vec_0.dot(vec_1)
    try:
        sim = dot_product / (vec_0.numNonzeros() + vec_1.numNonzeros())
        if sim > 0:
            return (id_0, id_1, sim)
    except ZeroDivisionError:
        pass
    return None

This worked very well for me and I hope someone else will find it useful as well!

Upvotes: 3

MisterJT
MisterJT

Reputation: 422

Is it the list that's problematic, or that SparseVectors comprise the list? One thought is to try converting the SparseVectors to DenseVectors, a suggestion I found here (Convert Sparse Vector to Dense Vector in Pyspark). The calculation result is no different, just how Spark handles it.

Upvotes: 0

Related Questions