Reputation: 33
I have a couple of RDDs of type :RDD[(String, Int)]. I'd like to subtract the integer values based upon the keys.
Here's an example: If the input RDDs are
Valid_ record = (TcustomerTDL_2016266,16)
deleted_record = (TcustomerTDL_2016266,8)
as the key values are same, integer values have to be subtracted. I tried using "SubtractByKey" but it doesn't seem to work. So the expected result is (TcustomerTDL_2016266,8) which is 16-8 = 8.`
I used the following code:
val changes_total = valid_record.subtractByKey(deleted_record).
Let me know if there is an alternative way to do this or if this is incorrect.
Here is the code:
val Conf = new SparkConf().setAppName("Module").setMaster("local")
val sc = new SparkContext(Conf)
val incoming_file =sc.wholeTextFiles("D:/Users/Documents/siva_hourly") //changed code
val output = incoming_file.map{case(k,v) => (k.split("/")(6),v.split("\\r?\\n"))}
output.cache()
val change_type = output.map{case (k,v) => (k,(v.toList.map( x => x.split("\001")(2))))} //changed code
val change_delete_count = change_type.map{case(k,v) => (k,(v.filter{ x => x == "D" }).length)}
val change_record_foreach4 = change_delete_count.map{case(k,v) => (k.split("_"),v)}
val change_record_foreach3 = change_record_foreach4.map{case(k,v)=>(k(0)+'_'+k(1),v)}
val change_valid_count = change_type.map{case(k,v) => (k,(v.filter{ x => x =="A" || x == "I"}).length)}
val change_record_foreach = change_valid_count.map{case(k,v) => (k.split("_"),v)}
val change_record_foreach1 = change_record_foreach.map{case(k,v)=>(k(0)+'_'+k(1),v)}
val valid_record = change_record_foreach1.reduceByKey((x, y) => x + y)
val deleted_record = change_record_foreach3.reduceByKey((x, y) => x + y)
val changes_total = valid_record.subtractByKey(deleted_record)
Upvotes: 2
Views: 6453
Reputation: 2099
This is not the correct usage of subtractByKey
Here is the example of how subtractByKey works
Let's suppose you have two RDDs as below.
two pair RDDs (rdd = {(1, 2), (3, 4), (3, 6)} other = {(3, 9)})
rdd.subtractByKey(other)
Result is as below
{(1, 2)}
You can do like this
val joinRDD = Valid_ record .join(deleted_record)
val resultRDD = joinRDD.mapValues(x => x._1 - x._2)
Upvotes: 6