Reputation: 1079
I need to do a fuzzy join between two large dataset (assuming 30Gb for each dataset) based on the similarity of two columns of string. For example:
Table 1:
Key1 |Value1
-------------
1 |qsdm fkq jmsk fqj msdk
Table 2:
Key2 |Value2
-------------
1 |qsdm fkqj mskf qjm sdk
We aims to calculate the cosine of similarity between each row of value1 with each row of value2, after that, thank to a thresold predefined, I can join two tables.
Key words: Entity resolution, cosine of similarity, inverted indices (to optimize the calculation of similarity), TF-IDF, token weight, words, document (a cell in value column), dataset
I use Spark (PySpark) for computing the join. At a moment of process, I have:
Using the strategy of inverted indices, I have reduced the number of computation about similarity between two documents (string). It's an array of RDD CommonTokens((key1, key2), tokens): key1 is key in table1, key2 is key in table2, tokens is a list of common tokens between value1 and value2. For each element in CommonTokens, I compute the cosine of similarity to generate ((key1, key2), similarity).
In spark, I did:
create a function similarity:
Apply the map in CommonTokens with similarity function defined above
spark-submit --master yarn-client --executor-cores 3 --executor-memory 20G --driver-memory 20G --driver-cores 12 --queue cku --num-executors 6 run/Join.py &
Problem in spark:
Thanks for any suggestion (Sorry for my english, feel free to ask me for further information if my question is not clear)
Upvotes: 13
Views: 5742
Reputation: 5155
You likely want to look into Local Sensitivity Hashing. Fortunately spark has already done the work for you. This will reduce the number of computations, and give you the euclidean difference between the two vectors. (Euclidean vs Cosine). The only real warning I'd give you would have to make all the vectors the same length, but it seems it would give you what you want with less work.
from pyspark.ml.feature import BucketedRandomProjectionLSH from pyspark.ml.linalg import Vectors from pyspark.sql.functions import col dataA = [(0, Vectors.dense([1.0, 1.0]),), (1, Vectors.dense([1.0, -1.0]),), (2, Vectors.dense([-1.0, -1.0]),), (3, Vectors.dense([-1.0, 1.0]),)] dfA = spark.createDataFrame(dataA, ["id", "features"]) dataB = [(4, Vectors.dense([1.0, 0.0]),), (5, Vectors.dense([-1.0, 0.0]),), (6, Vectors.dense([0.0, 1.0]),), (7, Vectors.dense([0.0, -1.0]),)] dfB = spark.createDataFrame(dataB, ["id", "features"]) key = Vectors.dense([1.0, 0.0]) brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0, numHashTables=3) model = brp.fit(dfA) # Feature Transformation print("The hashed dataset where hashed values are stored in the column 'hashes':") model.transform(dfA).show() # Compute the locality sensitive hashes for the input rows, then perform approximate # similarity join. # We could avoid computing hashes by passing in the already-transformed dataset, e.g. # `model.approxSimilarityJoin(transformedA, transformedB, 1.5)` print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:") model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\ .select(col("datasetA.id").alias("idA"), col("datasetB.id").alias("idB"), col("EuclideanDistance")).show() # Compute the locality sensitive hashes for the input rows, then perform approximate nearest # neighbor search. # We could avoid computing hashes by passing in the already-transformed dataset, e.g. # `model.approxNearestNeighbors(transformedA, key, 2)` print("Approximately searching dfA for 2 nearest neighbors of the key:") model.approxNearestNeighbors(dfA, key, 2).show()
Upvotes: 1