user2046117
user2046117

Reputation:

Spark map inside flatmap to replicate cartesian join

I'm trying to fuzzy join two datasets, one of the quotes and one of the sales. For arguments sake, the joining attributes are first name, surname, dob and email.

I have 26m+ quotes and 1m+ sales. Customers may not have used the accurate information for one or more of the attributes, so I'm giving them a score for each match (1,1,1,1) where all match (0,0,0,0) where none match.

So I end up with something similar to

q1, s1, (0,0,1,0)
q1, s2, (0,1,0,1)
q1, s3, (1,1,1,1)
q2, s1, (1,0,0,1)
...
q26000000 s1 (0,1,0,0)

So I think this is equivalent to a cartesian product that I'm managing my making a large number of partitions for the quotes

val quotesRaw = sc.textfile(....)
val quotes = quotesRaw.repartition(quotesRaw.count().toInt() / 100000)

val sales = sc.textfile(...)
val sb = sc.broadcast(sales.collect())

quotes.mapPartitions(p=> (
   p.flatMap(q => (
      sb.value.map(s =>
          q._1, s._1, ( if  q._2 == s._2 1 else 0, etc)
   )
)

This all works if I keep the numbers low, like 26m quotes but only 1000 sales but if I run it will all the sales it just stops responding when running

I'm running it with the following config.

spark-submit --conf spark.akka.frameSize=1024 \
--conf spark.executor.memory=3g --num-executors=30 \
--driver-memory 6g --class SalesMatch --deploy-mode client \
--master yarn SalesMatching-0.0.1-SNAPSHOT.jar \
hdfs://cluster:8020/data_import/Sales/SourceSales/2014/09/01/SourceSales_20140901.txt \
hdfs://cluster:8020/data_import/CDS/Enquiry/2014/01/01/EnquiryBackFill_20140101.txt \
hdfs://cluster:8020/tmp/_salesdata_matches_new

Is there anything that jumps out as obviously incorrect here?

Upvotes: 0

Views: 1184

Answers (1)

zero323
zero323

Reputation: 330063

Assuming 100k quotes per partition and 11M sales of total size 40MB your code generates roughly 4TB data per partition so it is rather unlikely your workers can handle this and it definitely cannot be done in memory.

I assume you're interested only in close matches so it makes sense to filter early. Simplifying your code a little (as far as I can tell there is no reason to use mapPartitions) :

// Check if match is close enough, where T is type of (q._1, s._1, (...))
def isCloseMatch(match: T): Boolean = ??? 

quotes.flatMap(q => sb.value
  .map(s => (q._1, s._1, (....))) // Map as before
  .filter(isCloseMatch) // yield only close matches
)

General remarks:

  • Creating broadcast from a RDD is an expensive process. First you have to transfer all data to the driver and then distribute among workers. It means repeated serialization/deserialization, network traffic and cost of storing data
  • For relatively simple operations like this it could be a good idea to use high level Spark SQL API:

    import org.apache.spark.sql.DataFrame
    
    val salesDF: DataFrame = ???
    val salesDF: DataFrame = ???
    val featureCols: Seq[String] = ???
    val threshold: Int = ???
    
    val inds = featureCols // Boolean columns 
      .map(col => (quotesDF(col) === salesDF(col)).alias(s"${col}_ind"))
    
    val isSimilar = inds // sum(q == s) > threshold
      .map(c => c.cast("integer").alias(c.toString))
      .reduce(_ + _)
      .geq(threshold)
    
    val combined = quotesDF
        .join(salesDF, isSimilar, "left")
    

Upvotes: 2

Related Questions