Sudheer Palyam
Sudheer Palyam

Reputation: 2519

kafka directstream dstream map does not print

I have this simple Kafka Stream

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

// Each Kafka message is a flight
val flights = messages.map(_._2)

flights.foreachRDD( rdd => {
  println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
  rdd.map { flight => {        
    val flightRows = FlightParser.parse(flight)
    println ("Parsed num rows: " + flightRows)
    }
  }          
})

ssc.start()
ssc.awaitTermination()

Kafka has messages, Spark Streaming it able to get them as RDDs. But the second println in my code does not print anything. i looked at driver console logs when ran in local[2] mode, checked yarn logs when ran in yarn-client mode.

What am I missing?

Instead of rdd.map, the following code prints well in spark driver console:

for(flight <- rdd.collect().toArray) {
     val flightRows = FlightParser.parse(flight)
     println ("Parsed num rows: " + flightRows)
}

But I'm afraid that processing on this flight object might happen in spark driver project, instead of executor. Please correct me if i'm wrong.

Thanks

Upvotes: 2

Views: 1004

Answers (1)

maasg
maasg

Reputation: 37435

rdd.map is a lazy transformation. It won't be materialized unless an action is called on that RDD.
In this specific case, we could use rdd.foreach which is one of the most generic actions on RDD, giving us access to each element in the RDD.

flights.foreachRDD{ rdd => 
    rdd.foreach { flight =>        
        val flightRows = FlightParser.parse(flight)
        println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently
    }
}

Given that this RDD action is executed in the executors, we will find the println output in the executor's STDOUT.

If you would like to print the data on the driver instead, you can collect the data of the RDD within the DStream.foreachRDD closure.

flights.foreachRDD{ rdd => 
  val allFlights = rdd.collect() 
  println(allFlights.mkString("\n")) // prints to the stdout of the driver
}

Upvotes: 2

Related Questions