Reputation: 3066
I encounter an wired problem that the result is not correct.
I have a class called A, and it has a value called keyword
.
I want to filter the RDD[A] if it has some keyword
.
Spark environment: version: 1.3.1 execution env: yarn-client
Here is the code:
class A ...
case class C(words:Set[String] ) extends Serializable {
def run(data:RDD[A])(implicit sc:SparkContext) ={
data.collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)
}
}
// in main function
val data:RDD[A] = ....
val c = C(Set("abc"))
c.run(data)
The code above prints nothing. However if I collect RDD[A] to local, then it print something! E.g.
data.take(1000).collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)}
How could this happen?
Let me ask another related question: Should I make case class C
extends Serializable
? I don't think it is necessary.
Upvotes: 1
Views: 58
Reputation: 3055
The reason is quite easy. If you run the println
function when you collect data locally, what happens is that your data are trasferred over the network to the machine you are using (let's call it the client of the Spark environment) and then it is printed on your console. SO far, everything behaves as expected. Instead, if you run the println
function on a distributed RDD
, the println
function is executed locally on the worker machine on which there are your data. So the function is actually executed but you won't see any result on the console of your client, unless it is also a worker machine: in fact, everything is printed on the console of the respective worker node.
No, it's not necessary you make it Serializable
, the only thing is serialized is your words:Set[String]
.
Upvotes: 2