Reputation: 6371
My spark version is 1.2.0, and here's the scenario:
There are two RDDs, namely RDD_A and RDD_B, whose data structure are all RDD[(spid, the_same_spid)]. RDD_A has 20,000 lines whereas RDD_B 3,000,000,000 lines. I intend to calculate line count of RDD_B whose 'spid' exists in RDD_A.
My first implementation is quite mainstream, applying join
method from RDD_B on RDD_A:
val currentDay = args(0)
val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)
//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)
//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
//---join---
val filteredTongYuanRdd = tongYuanRdd.join(spidRdds);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())
However, the result is incorrect (bigger than) when comparing to the hive's one. When changing the join
method from reduce-side join to map-side join as below, the result is just the same as the hive's result:
val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)
//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)
//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
//---join---
val globalSpids = sc.broadcast(spidRdds.collectAsMap());
val filteredTongYuanRdd = tongYuanRdd.mapPartitions({
iter =>
val m = globalSpids.value
for {
(spid, spid_cp) <- iter
if m.contains(spid)
} yield spid
}, preservesPartitioning = true);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())
As you can see, the only difference between the above two code snippets is the 'join' part.
So, is there any suggestions on addressing this problem? Thanks in advance!
Upvotes: 0
Views: 622
Reputation: 7452
Spark's join doesn't enforce uniquiness of key, and when the key is duplicated actually outputs the cross product for that key. Using cogroup
and only outputting on k/v pair for each key, or maping to just the ids and then using intersection
will do the trick.
Upvotes: 1