Judking
Judking

Reputation: 6371

join in Spark outputs wrong result whereas map-side join is correct

My spark version is 1.2.0, and here's the scenario:

There are two RDDs, namely RDD_A and RDD_B, whose data structure are all RDD[(spid, the_same_spid)]. RDD_A has 20,000 lines whereas RDD_B 3,000,000,000 lines. I intend to calculate line count of RDD_B whose 'spid' exists in RDD_A.

My first implementation is quite mainstream, applying join method from RDD_B on RDD_A:

val currentDay = args(0)

val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)

//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)

//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));

//---join---
val filteredTongYuanRdd = tongYuanRdd.join(spidRdds);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

However, the result is incorrect (bigger than) when comparing to the hive's one. When changing the join method from reduce-side join to map-side join as below, the result is just the same as the hive's result:

val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)

//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)

//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));

//---join---
val globalSpids = sc.broadcast(spidRdds.collectAsMap());
val filteredTongYuanRdd = tongYuanRdd.mapPartitions({
  iter =>
    val m = globalSpids.value
    for {
      (spid, spid_cp) <- iter
      if m.contains(spid)
    } yield spid
}, preservesPartitioning = true);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

As you can see, the only difference between the above two code snippets is the 'join' part.

So, is there any suggestions on addressing this problem? Thanks in advance!

Upvotes: 0

Views: 622

Answers (1)

Holden
Holden

Reputation: 7452

Spark's join doesn't enforce uniquiness of key, and when the key is duplicated actually outputs the cross product for that key. Using cogroup and only outputting on k/v pair for each key, or maping to just the ids and then using intersection will do the trick.

Upvotes: 1

Related Questions