Reputation: 972
I am doing fuzzy string match using MinHashLSH
and approxSimilarityJoin
on 500 billion pairs. It is too big for my current cluster setup, thus, I want to run it in batches
I want to partition the data, and run approxSimilarityJoin
on each partition iteratively such that my cluster can handle it.
My current function is:
matched_df = model.stages[-1].approxSimilarityJoin(df1, df2, 1.0, "confidence")
But I am stuck on how to combine repartition
, foreachPartition
and approxSimilarityJoin
.
I think it should be something like:
df1.repartition(100).foreachPartition(batch : model.stages[-1].approxSimilarityJoin(batch, df2, 1.0, "confidence"))
but I have the wrong syntax. What is the correct syntax for foreachPartition
?
Upvotes: 1
Views: 1322
Reputation: 1242
I don't think you can achieve that using foreachParition
.
foreachParition
takes a function that will be run on the executors and passes into it actual data, not a dataframe (it's an action that will trigger processing, like .collect or .write, not just a transformation definition). And if you wanted to recreate a dataframe from this passed in set, this also won't work as there is no spark context available on the worker itself. Conceptually dataframe is not a table but a lazy-evaluated definition of a transformation.
What you can do, however, is just split the df1
using Spark. If there is no key on which you can filter the DataFrame, you can just do it using randomSplit
, e.g.:
df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)
The result of this operation is a list of DataFrames
[DataFrame[date: string, text: string],
DataFrame[date: string, text: string],
DataFrame[date: string, text: string],
DataFrame[date: string, text: string],
DataFrame[date: string, text: string]]
over which you can iterate using regular Python
dfs = df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)
for df in dfs:
matched_df = model.stages[-1].approxSimilarityJoin(df, df2, 1.0, "confidence")
do_something_with(matched_df)
To split your dataset this way into 100 parts, you can generate the weights tuple:
df.randomSplit(tuple([0.01 for x in range (100)]), seed=42)
Upvotes: 1