Siva
Siva

Reputation: 33

Spark, Scala : How to Subtract the values in the RDD pairs based on their key?

I have a couple of RDDs of type :RDD[(String, Int)]. I'd like to subtract the integer values based upon the keys.

Here's an example: If the input RDDs are

Valid_ record  = (TcustomerTDL_2016266,16) 
deleted_record = (TcustomerTDL_2016266,8) 

as the key values are same, integer values have to be subtracted. I tried using "SubtractByKey" but it doesn't seem to work. So the expected result is (TcustomerTDL_2016266,8) which is 16-8 = 8.`

I used the following code:

val changes_total = valid_record.subtractByKey(deleted_record).

Let me know if there is an alternative way to do this or if this is incorrect.

Here is the code:

val Conf = new SparkConf().setAppName("Module").setMaster("local")
val sc = new SparkContext(Conf)
val incoming_file =sc.wholeTextFiles("D:/Users/Documents/siva_hourly") //changed code
val output = incoming_file.map{case(k,v) => (k.split("/")(6),v.split("\\r?\\n"))} 
output.cache()
val change_type = output.map{case (k,v) => (k,(v.toList.map( x => x.split("\001")(2))))} //changed code
val change_delete_count = change_type.map{case(k,v) => (k,(v.filter{ x => x == "D" }).length)}
val change_record_foreach4 = change_delete_count.map{case(k,v) => (k.split("_"),v)} 
val change_record_foreach3 = change_record_foreach4.map{case(k,v)=>(k(0)+'_'+k(1),v)}
val change_valid_count = change_type.map{case(k,v) => (k,(v.filter{ x => x =="A" || x == "I"}).length)}
val change_record_foreach = change_valid_count.map{case(k,v) => (k.split("_"),v)}   
val change_record_foreach1 = change_record_foreach.map{case(k,v)=>(k(0)+'_'+k(1),v)}
val valid_record = change_record_foreach1.reduceByKey((x, y) => x + y)
val deleted_record = change_record_foreach3.reduceByKey((x, y) => x + y)
val changes_total = valid_record.subtractByKey(deleted_record)

Upvotes: 2

Views: 6453

Answers (1)

Arunakiran Nulu
Arunakiran Nulu

Reputation: 2099

This is not the correct usage of subtractByKey

Here is the example of how subtractByKey works

Let's suppose you have two RDDs as below.

two pair RDDs (rdd = {(1, 2), (3, 4), (3, 6)} other = {(3, 9)})

rdd.subtractByKey(other)

Result is as below

{(1, 2)}

You can do like this

val joinRDD = Valid_ record .join(deleted_record)
val resultRDD = joinRDD.mapValues(x => x._1 - x._2)

Upvotes: 6

Related Questions