lserlohn
lserlohn

Reputation: 6206

Why foreachRDD in Spark Streaming + Kafka is slow, should it be?

I am using Spark 2.1 and Kafka 0.08.xx to do a Spark Streaming job. It is a text filtering job, and most of the text will be filtered out during the process. I implemented in two different ways:

  1. Do filtering directly on the output of DirectStream:

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    val jsonMsg = messages.map(_._2)
    val filteredMsg = jsonMsg.filter(x=>x.contains(TEXT1) && x.contains(TEXT2) && x.contains(TEXT3))
    
  2. use the foreachRDD function

     messages.foreachRDD { rdd => 
               val record = rdd.map(_.2).filter(x => x.contains(TEXT1) &&
                                                     x.contains(TEXT2) &&
                                                     x.contains(TEXT3) )} 
    

I found the first method is noticeably faster than the second method, but I am not sure this is the common case.

Is there any difference between method 1 and method 2?

Upvotes: 1

Views: 689

Answers (1)

ImDarrenG
ImDarrenG

Reputation: 2345

filter is a transformation. Transformations are evaluated lazily, that is, they don't do anything until you perform an action, such as foreachRDD, writing the data, etc.

So in 1. actually nothing is happening, hence significantly faster than 2., which is using the action foreachRDD to do something.

Upvotes: 1

Related Questions