Béatrice M.
Béatrice M.

Reputation: 972

PySpark - Apply function on re-partitioned batches

I am doing fuzzy string match using MinHashLSH and approxSimilarityJoin on 500 billion pairs. It is too big for my current cluster setup, thus, I want to run it in batches

I want to partition the data, and run approxSimilarityJoin on each partition iteratively such that my cluster can handle it.

My current function is:

matched_df = model.stages[-1].approxSimilarityJoin(df1, df2, 1.0, "confidence")

But I am stuck on how to combine repartition, foreachPartition and approxSimilarityJoin.

I think it should be something like:

df1.repartition(100).foreachPartition(batch : model.stages[-1].approxSimilarityJoin(batch, df2, 1.0, "confidence"))

but I have the wrong syntax. What is the correct syntax for foreachPartition?

Upvotes: 1

Views: 1322

Answers (1)

Daniel
Daniel

Reputation: 1242

I don't think you can achieve that using foreachParition. foreachParition takes a function that will be run on the executors and passes into it actual data, not a dataframe (it's an action that will trigger processing, like .collect or .write, not just a transformation definition). And if you wanted to recreate a dataframe from this passed in set, this also won't work as there is no spark context available on the worker itself. Conceptually dataframe is not a table but a lazy-evaluated definition of a transformation.

What you can do, however, is just split the df1 using Spark. If there is no key on which you can filter the DataFrame, you can just do it using randomSplit, e.g.:

df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)

The result of this operation is a list of DataFrames

[DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string]]

over which you can iterate using regular Python

dfs = df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)
for df in dfs:
    matched_df = model.stages[-1].approxSimilarityJoin(df, df2, 1.0, "confidence")
    do_something_with(matched_df)

To split your dataset this way into 100 parts, you can generate the weights tuple:

df.randomSplit(tuple([0.01 for x in range (100)]), seed=42)

Upvotes: 1

Related Questions