Reputation: 2011
I understand the basic difference between map
& foreach
(lazy and eager), also I understand why this code snippet
sc.makeRDD(Seq("a", "b")).map(s => new java.io.ByteArrayInputStream(s.getBytes)).collect
should give
java.io.NotSerializableException: java.io.ByteArrayInputStream
And then I think so should the following code snippet
sc.makeRDD(Seq("a", "b")).foreach(s => {
val is = new java.io.ByteArrayInputStream(s.getBytes)
println("is = " + is)
})
But this code runs fine. Why so?
Upvotes: 2
Views: 381
Reputation: 147
collect
call after map is causing the issue.
Below are results of my testing in spark-shell.
Below passes as no data has to be sent to other nodes.
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).count
Below calls fail, as map output can be sent to other nodes.
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).first
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).collect
Repartition forces distribution of data to nodes, which fails.
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).repartition(2).saveAsTextFile("/tmp/NWRepart")
Without repartition below call passes.
sc.makeRDD(1 to 1000, 1).map(_ => {NullWritable.get}).saveAsTextFile("/tmp/NW")
Upvotes: 0
Reputation: 330373
Actually fundamental difference between map
and foreach
is not evaluation strategy. Lets take a look at the signatures (I've omitted implicit part of map
for brevity):
def map[U](f: (T) ⇒ U): RDD[U]
def foreach(f: (T) ⇒ Unit): Unit
map
takes a function from T
to U
applies it to each element of the existing RDD[T]
and returns RDD[U]
. To allow operations likes shuffling U
has to be serializable.
foreach
takes a function from T
to Unit
(which is analogous to Java void
) and by itself returns nothing. Everything happens locally, there is no network traffic involved so there is no need for serialization. Unlike map
, foreach
should be used when want to get some kind of side effect, like in your previous question.
On a side note these two are actually different. Anonymous function you use in map
is a function:
(s: String) => java.io.ByteArrayInputStream
and one you use in foreach
like this:
(s: String) => Unit
If you use the second function with map
your code will compile, although result would be far from what you want (RDD[Unit]
).
Upvotes: 3