Justin Pihony
Justin Pihony

Reputation: 67115

How to use RowMatrix.columnSimilarities (similarity search)

TL;DR; I am trying to train off of an existing data set (Seq[Words] with corresponding categories), and use that trained dataset to filter another dataset using category similarity.

I am trying to train a corpus of data and then use it for text analysis*. I've tried using NaiveBayes, but that seems to only work with the data you have, so it's predict algorithm will always return something, even if it doesn't match anything.

So, I am now trying to use TFIDF and passing that output into a RowMatrix and computing the similarities. But, I'm not sure how to run my query (one word for now). Here's what I've tried:

val rddOfTfidfFromCorpus : RDD[Vector]
val query = "word"
val tf = new HashingTF().transform(List(query))
val tfIDF = new IDF().fit(sc.makeRDD(List(tf))).transform(tf)  
val mergedVectors = rddOfTfidfFromCorpus.union(sc.makeRDD(List(tfIDF)))
val similarities = new RowMatrix(mergedVectors).columnSimilarities(1.0)

Here is where I'm stuck (if I've even done everything right until here). I tried filtering the similarities i and j down to the parts of my query's TFIDF and end up with an empty collection.

The gist is that I want to train on a corpus of data and find what category it falls in. The above code is at least trying to get it down to one category and checking if I can get a prediction from that at least....

*Note that this is a toy example, so I only need something that works well enough *I am using Spark 1.4.0

Upvotes: 1

Views: 2420

Answers (1)

zero323
zero323

Reputation: 330353

Using columnSimilarities doesn't make sense here. Since each column in your matrix represents a set of terms you'll get a matrix of similarities between tokens not documents. You could transpose the matrix and then use columnSimilarities but as far as I understand what you want is a similarity between query and corpus. You can express that using matrix multiplication as follows:

For starters you'll need an IDFModel you've trained on a corpus. Lets assume it is called idf:

import org.apache.spark.mllib.feature.IDFModel
val idf: IDFModel = ??? // Trained using corpus data

and a small helper:

def toBlockMatrix(rdd: RDD[Vector]) = new IndexedRowMatrix(
  rdd.zipWithIndex.map{case (v, i) => IndexedRow(i, v)}
).toCoordinateMatrix.toBlockMatrix

First lets convert query to an RDD and compute TF:

val query: Seq[String] = ??? 
val queryTf = new HashingTF().transform(query)

Next we can apply IDF model and convert result to matrix:

val queryTfidf = idf.transform(queryTf)
val queryMatrix = toBlockMatrix(queryTfidf)

We'll need a corpus matrix as well:

val corpusMatrix = toBlockMatrix(rddOfTfidfFromCorpus)

If you multiple both we get a matrix with number of rows equal to the number of docs in the query and number of columns equal to the number of documents in the corpus.

val dotProducts = queryMatrix.multiply(corpusMatrix.transpose)

To get a proper cosine similarity you have to divide by a product of magnitudes but if you can handle that.

There are two problems here. First of all it is rather expensive. Moreover I am not sure if it really useful. To reduce cost you can apply some dimensionality reduction algorithm first but lets leave it for now.

Judging from a following statement

NaiveBayes (...) seems to only work with the data you have, so it's predict algorithm will always return something, even if it doesn't match anything.

I guess you want some kind of unsupervised learning method. The simplest thing you can try is K-means:

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}

val numClusters: Int = ???
val numIterations = 20

val model = KMeans.train(rddOfTfidfFromCorpus, numClusters, numIterations)
val predictions = model.predict(queryTfidf)

Upvotes: 1

Related Questions