kamalbanga
kamalbanga

Reputation: 2011

Why in an RDD, map gives NotSerializableException while foreach doesn't?

I understand the basic difference between map & foreach (lazy and eager), also I understand why this code snippet

sc.makeRDD(Seq("a", "b")).map(s => new java.io.ByteArrayInputStream(s.getBytes)).collect

should give

java.io.NotSerializableException: java.io.ByteArrayInputStream

And then I think so should the following code snippet

sc.makeRDD(Seq("a", "b")).foreach(s => {
  val is = new java.io.ByteArrayInputStream(s.getBytes)
  println("is = " + is)
})

But this code runs fine. Why so?

Upvotes: 2

Views: 381

Answers (2)

user10439725
user10439725

Reputation: 147

collect call after map is causing the issue. Below are results of my testing in spark-shell.

Below passes as no data has to be sent to other nodes.

sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).count

Below calls fail, as map output can be sent to other nodes.

sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).first
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).collect

Repartition forces distribution of data to nodes, which fails.

sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).repartition(2).saveAsTextFile("/tmp/NWRepart")

Without repartition below call passes.

sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).saveAsTextFile("/tmp/NW")

Upvotes: 0

zero323
zero323

Reputation: 330373

Actually fundamental difference between map and foreach is not evaluation strategy. Lets take a look at the signatures (I've omitted implicit part of map for brevity):

def map[U](f: (T) ⇒ U): RDD[U]
def foreach(f: (T) ⇒ Unit): Unit 

map takes a function from T to U applies it to each element of the existing RDD[T] and returns RDD[U]. To allow operations likes shuffling U has to be serializable.

foreach takes a function from T to Unit (which is analogous to Java void) and by itself returns nothing. Everything happens locally, there is no network traffic involved so there is no need for serialization. Unlike map, foreach should be used when want to get some kind of side effect, like in your previous question.

On a side note these two are actually different. Anonymous function you use in map is a function:

(s: String) => java.io.ByteArrayInputStream

and one you use in foreach like this:

(s: String) => Unit

If you use the second function with map your code will compile, although result would be far from what you want (RDD[Unit]).

Upvotes: 3

Related Questions