Reputation:
I have 2 RDD's that are pulled in with the following code:
val fileA = sc.textFile("fileA.txt")
val fileB = sc.textFile("fileB.txt")
I then Map and Reduce it by key:
val countsB = fileB.flatMap(line => line.split("\n"))
.map(word => (word, 1))
.reduceByKey(_+_)
val countsA = fileA.flatMap(line => line.split("\n"))
.map(word => (word, 1))
.reduceByKey(_+_)
I now wan't to find and remove all keys in countB if the key exist in countA
I have tried something like:
countsB.keys.foreach(b => {
if(countsB.collect().exists(_ == b)){
countsB.collect().drop(countsB.collect().indexOf(b))
}
})
but it doesn't seem like it removes them by the key.
Upvotes: 1
Views: 6907
Reputation: 37852
There are 3 issues with your suggested code:
You are collect
ing the RDDs, which means they are not RDDs anymore, they are copied into the driver application's memory as plain Scala collections, so you lose Spark's parallelism and risk OutOfMemory errors in case your dataset is large
When calling drop
on an immutable Scala collection (or an RDD
), you don't change the original collection, you get a new collection with those records dropped, so you can't expect original collection to change
You cannot access an RDD
within a function passed to any of the RDDs higher-order methods (e.g. foreach
in this case) - any function passed to these method is serialized and sent to workers, and RDD
s are (intentionally) not serializable - it makes no sense to fetch them into driver memory, serialize them, and send back to workers - the data is already distributed on the workers!
To solve all these - when you want to use one RDD's data to transform/filter another one, you usually want to use some type of join
. In this case you can do:
// left join, and keep only records for which there was NO match in countsA:
countsB.leftOuterJoin(countsA).collect { case (key, (valueB, None)) => (key, valueB) }
NOTE that this collect
that I'm using here isn't the collect
you used - this one takes a PartialFunction
as an argument, and behaves like a combination of map
and filter
, and most importantly: it doesn't copy all data into driver memory.
EDIT: as The Archetypal Paul commented - you have a much shorter and nicer option - subtractByKey
:
countsB.subtractByKey(countsA)
Upvotes: 3