user3316920
user3316920

Reputation:

RDD Remove elements by key

I have 2 RDD's that are pulled in with the following code:

val fileA = sc.textFile("fileA.txt")
val fileB = sc.textFile("fileB.txt")

I then Map and Reduce it by key:

val countsB = fileB.flatMap(line => line.split("\n"))
  .map(word => (word, 1))
  .reduceByKey(_+_)

val countsA = fileA.flatMap(line => line.split("\n"))
  .map(word => (word, 1))
  .reduceByKey(_+_)

I now wan't to find and remove all keys in countB if the key exist in countA

I have tried something like:

countsB.keys.foreach(b => {
  if(countsB.collect().exists(_ == b)){
    countsB.collect().drop(countsB.collect().indexOf(b))
  }
})

but it doesn't seem like it removes them by the key.

Upvotes: 1

Views: 6907

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37852

There are 3 issues with your suggested code:

  1. You are collecting the RDDs, which means they are not RDDs anymore, they are copied into the driver application's memory as plain Scala collections, so you lose Spark's parallelism and risk OutOfMemory errors in case your dataset is large

  2. When calling drop on an immutable Scala collection (or an RDD), you don't change the original collection, you get a new collection with those records dropped, so you can't expect original collection to change

  3. You cannot access an RDD within a function passed to any of the RDDs higher-order methods (e.g. foreach in this case) - any function passed to these method is serialized and sent to workers, and RDDs are (intentionally) not serializable - it makes no sense to fetch them into driver memory, serialize them, and send back to workers - the data is already distributed on the workers!

To solve all these - when you want to use one RDD's data to transform/filter another one, you usually want to use some type of join. In this case you can do:

// left join, and keep only records for which there was NO match in countsA:
countsB.leftOuterJoin(countsA).collect { case (key, (valueB, None)) => (key, valueB) }

NOTE that this collect that I'm using here isn't the collect you used - this one takes a PartialFunction as an argument, and behaves like a combination of map and filter, and most importantly: it doesn't copy all data into driver memory.

EDIT: as The Archetypal Paul commented - you have a much shorter and nicer option - subtractByKey:

countsB.subtractByKey(countsA)

Upvotes: 3

Related Questions