Reputation: 11
I am trying to compare two very large dataframes, each with around 10 petabytes of data in Spark. The execution is throwing out of memory e this.issues even after increasing the memory configurations. Could anyone suggest a better alternative to solve this?
Approach I am using:
Upvotes: 1
Views: 357
Reputation: 703
You can use Spark's LSH implementation to hash both dataframes into lower dimensions with a very strict similarity measure. After hashing, you can perform an approxSimilarityJoin
Some basic code on how to do this:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
# Create feature vectors
df = df.withColumn("features", Vectors.dense(col("feature_col1"), col("feature_col2")))
df1 = df1.withColumn("features", Vectors.dense(col("feature_col1"), col("feature_col2")))
# Initialize LSH model
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
# Fit the model and transform data
model = mh.fit(df)
df_hashed = model.transform(df)
df1_hashed = model.transform(df1)
# Approximate join to find potential matches
approx_joined_df = model.approxSimilarityJoin(df_hashed, df1_hashed, 0.1, distCol="JaccardDistance")
# Filter based on a distance threshold for very very similar items
filtered_df = approx_joined_df.filter("JaccardDistance < 0.05")
This method should be certainly better than a straight comparison. However you might find that for the kind of scale you are talking about you will certainly need to play with the config settings as well.
Upvotes: 1