Sanku Sireesha
Sanku Sireesha

Reputation: 11

Compare very large dataframes

I am trying to compare two very large dataframes, each with around 10 petabytes of data in Spark. The execution is throwing out of memory e this.issues even after increasing the memory configurations. Could anyone suggest a better alternative to solve this?

Approach I am using:

  1. Generate row_hashes for each dataframe
  2. diff = df.select('row_hash') - df1.select('row_hash')
  3. diff.join(df, df.columns.toSeq, "inner")

Upvotes: 1

Views: 357

Answers (1)

mamonu
mamonu

Reputation: 703

You can use Spark's LSH implementation to hash both dataframes into lower dimensions with a very strict similarity measure. After hashing, you can perform an approxSimilarityJoin

Some basic code on how to do this:

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

# Create feature vectors
df = df.withColumn("features", Vectors.dense(col("feature_col1"), col("feature_col2")))
df1 = df1.withColumn("features", Vectors.dense(col("feature_col1"), col("feature_col2")))

# Initialize LSH model
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)

# Fit the model and transform data
model = mh.fit(df)
df_hashed = model.transform(df)
df1_hashed = model.transform(df1)

# Approximate join to find potential matches
approx_joined_df = model.approxSimilarityJoin(df_hashed, df1_hashed, 0.1, distCol="JaccardDistance")

# Filter based on a distance threshold for very very similar items
filtered_df = approx_joined_df.filter("JaccardDistance < 0.05")

This method should be certainly better than a straight comparison. However you might find that for the kind of scale you are talking about you will certainly need to play with the config settings as well.

Upvotes: 1

Related Questions