Reputation: 2104
I am trying to calculate Jaccard distance between certain ids with their attributes in the form of SparseVectors.
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
sqlContext = SQLContext(sc)
df = sqlContext.read.load("path")
par = udf(lambda s: Vectors.parse(s), VectorUDT())
d = df_filtered.select("id",par("vect"))
from pyspark.ml.linalg import VectorUDT as VectorUDTML
as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDTML())
result = d.withColumn("<lambda>(vect)", as_ml("<lambda>(vect)"))
mh = MinHashLSH(inputCol="<lambda>(vect)", outputCol="hashes", seed=12345, numHashTables=15)
model = mh.fit(df)
a = model.transform(df)
jd = model.approxSimilarityJoin(a, a,1.0 , distCol="JaccardDistance").select(
col("datasetA.id1").alias("idA"),
col("datasetB.id1").alias("idB"),
col("JaccardDistance"))
The df has the two columns, id
and sparse_vector
. id
column is an alphanumeric id and sparse_vector
columns contains records like this SparseVector(243775, {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 11: 1.0, 12: 1.0, 13: 1.0, 14: 1.0, 15: 1.0, 16: 1.0, 24: 1.0, 30: 1.0, 31: 1.0, 32: 1.0, 61: 1.0, 88: 1.0, 90: 1.0, 96: 1.0, 104: 1.0, 153: 1.0, 155: 1.0, 159: 1.0, 160: 1.0, 161: 1.0, 162: 1.0, 169: 1.0, 181: 1.0, 194: 1.0, 212: 1.0, 220: 1.0, 222: 1.0, 232: 1.0, 303: 1.0, 390: 1.0, 427: 1.0, 506: 1.0, 508: 1.0, 509: 1.0, 518: 1.0, 554: 1.0, 568: 1.0, 798: 1.0, 1431: 1.0, 2103: 1.0, 2139: 1.0, 3406: 1.0, 3411: 1.0, 3415: 1.0, 3429: 1.0, 3431: 1.0, 3440: 1.0, 3443: 1.0, 3449: 1.0}))
When I compute the Jaccard and write data down, i am missing a lot of id pairs. There are a total of 45k identities in the data so the output should contain roughly 45k*45k pairs.
Also I get all possible pairs when i compare just 1k ids to 45k ids and do all ids that way, sort of like batches. Any input will be helpful. Also, can I parallelize code so that I have batch system faster? I am running code on an emr cluster and have resources to increase cluster size.
The following script can be used to generate a sample data with id and artificially generated sparse vectors.
from random import randint
from collections import OrderedDict
with open('/mnt/lsh_data.csv', 'a') as the_file:
the_file.write("id\vect\n")
for i in range(45000):
a = "id"
b = a + str(i)
num_ent = randint(101, 195) + randint(102, 200)
lis = []
for j in range(num_ent):
lis.append(randint(0, 599999))
lis.sort()
l = list(OrderedDict.fromkeys(lis))
data = []
for j in range(len(l)):
c = randint(0,1)
if c == 0:
data.append(1.0)
else:
data.append(0.0)
b = b + "\t(600000,"+str(l)+","+str(data)+")\n"
the_file.write(b)
Upvotes: 1
Views: 1262
Reputation: 1459
Examining the approxSimilarityJoin
source code you can see that the function first performs a join on the locality sensitive hash (LSH) of each input vector which "hashes similar input items into the same buckets with high probability." It then calculates distances on the result. The effect being that the distance is only calculated between vectors that end up in the same bucket after taking the LSH of each vector. That is why you don't see distances for all pairs in the input dataset, only for pairs of vectors that end up in the same bucket.
Furthermore, the inputs to the LSH are an input vector from the data and the random coefficients derived from the initial seed, explaining why changing the seed changes the bucketing and hence the output.
If you experiment by changing the value of the MinHashLSH
seed
parameter you can see the bucketing change.
Upvotes: 0
Reputation: 1174
Not really an answer but too long for a comment:
I'm not exactly sure how the approxSimilarityJoin
works and what the expected output is. However I checked the example given in the docs (http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=minhash%20lsh#pyspark.ml.feature.MinHashLSH) which is only 3 x 3 and even there we do not get the full cross product (even if we increase the threshold). So maybe this is not the expected output...
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.feature import MinHashLSH
data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
(1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
(2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
df = spark.createDataFrame(data, ["id", "features"])
mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
model = mh.fit(df)
model.transform(df).head()
data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
(4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
(5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
df2 = spark.createDataFrame(data2, ["id", "features"])
model.approxSimilarityJoin(df, df2, 1.0, distCol="JaccardDistance").show()
Upvotes: 0