worldterminator
worldterminator

Reputation: 3066

Spark could not filter correctly?

I encounter an wired problem that the result is not correct. I have a class called A, and it has a value called keyword. I want to filter the RDD[A] if it has some keyword.

Spark environment: version: 1.3.1 execution env: yarn-client

Here is the code:

class A ...
case class C(words:Set[String] ) extends Serializable {

  def run(data:RDD[A])(implicit sc:SparkContext) ={
    data.collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)
  }
}

   // in main function
  val data:RDD[A] = ....
  val c = C(Set("abc"))
  c.run(data)

The code above prints nothing. However if I collect RDD[A] to local, then it print something! E.g.

data.take(1000).collect{ case x:A=> x }.filter(y => words.contains(y.keyword)).foreach(println)}

How could this happen?

Let me ask another related question: Should I make case class C extends Serializable? I don't think it is necessary.

Upvotes: 1

Views: 58

Answers (1)

mgaido
mgaido

Reputation: 3055

The reason is quite easy. If you run the println function when you collect data locally, what happens is that your data are trasferred over the network to the machine you are using (let's call it the client of the Spark environment) and then it is printed on your console. SO far, everything behaves as expected. Instead, if you run the println function on a distributed RDD, the println function is executed locally on the worker machine on which there are your data. So the function is actually executed but you won't see any result on the console of your client, unless it is also a worker machine: in fact, everything is printed on the console of the respective worker node.

No, it's not necessary you make it Serializable, the only thing is serialized is your words:Set[String].

Upvotes: 2

Related Questions