Reputation:
I'm trying to fuzzy join two datasets, one of the quotes and one of the sales. For arguments sake, the joining attributes are first name, surname, dob and email.
I have 26m+ quotes and 1m+ sales. Customers may not have used the accurate information for one or more of the attributes, so I'm giving them a score for each match (1,1,1,1) where all match (0,0,0,0) where none match.
So I end up with something similar to
q1, s1, (0,0,1,0)
q1, s2, (0,1,0,1)
q1, s3, (1,1,1,1)
q2, s1, (1,0,0,1)
...
q26000000 s1 (0,1,0,0)
So I think this is equivalent to a cartesian product that I'm managing my making a large number of partitions for the quotes
val quotesRaw = sc.textfile(....)
val quotes = quotesRaw.repartition(quotesRaw.count().toInt() / 100000)
val sales = sc.textfile(...)
val sb = sc.broadcast(sales.collect())
quotes.mapPartitions(p=> (
p.flatMap(q => (
sb.value.map(s =>
q._1, s._1, ( if q._2 == s._2 1 else 0, etc)
)
)
This all works if I keep the numbers low, like 26m quotes but only 1000 sales but if I run it will all the sales it just stops responding when running
I'm running it with the following config.
spark-submit --conf spark.akka.frameSize=1024 \
--conf spark.executor.memory=3g --num-executors=30 \
--driver-memory 6g --class SalesMatch --deploy-mode client \
--master yarn SalesMatching-0.0.1-SNAPSHOT.jar \
hdfs://cluster:8020/data_import/Sales/SourceSales/2014/09/01/SourceSales_20140901.txt \
hdfs://cluster:8020/data_import/CDS/Enquiry/2014/01/01/EnquiryBackFill_20140101.txt \
hdfs://cluster:8020/tmp/_salesdata_matches_new
Is there anything that jumps out as obviously incorrect here?
Upvotes: 0
Views: 1184
Reputation: 330063
Assuming 100k quotes per partition and 11M sales of total size 40MB your code generates roughly 4TB data per partition so it is rather unlikely your workers can handle this and it definitely cannot be done in memory.
I assume you're interested only in close matches so it makes sense to filter early. Simplifying your code a little (as far as I can tell there is no reason to use mapPartitions
) :
// Check if match is close enough, where T is type of (q._1, s._1, (...))
def isCloseMatch(match: T): Boolean = ???
quotes.flatMap(q => sb.value
.map(s => (q._1, s._1, (....))) // Map as before
.filter(isCloseMatch) // yield only close matches
)
General remarks:
For relatively simple operations like this it could be a good idea to use high level Spark SQL API:
import org.apache.spark.sql.DataFrame
val salesDF: DataFrame = ???
val salesDF: DataFrame = ???
val featureCols: Seq[String] = ???
val threshold: Int = ???
val inds = featureCols // Boolean columns
.map(col => (quotesDF(col) === salesDF(col)).alias(s"${col}_ind"))
val isSimilar = inds // sum(q == s) > threshold
.map(c => c.cast("integer").alias(c.toString))
.reduce(_ + _)
.geq(threshold)
val combined = quotesDF
.join(salesDF, isSimilar, "left")
Upvotes: 2