Reputation: 2519
I have this simple Kafka Stream
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Each Kafka message is a flight
val flights = messages.map(_._2)
flights.foreachRDD( rdd => {
println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records");
rdd.map { flight => {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
}
})
ssc.start()
ssc.awaitTermination()
Kafka has messages, Spark Streaming it able to get them as RDDs. But the second println in my code does not print anything. i looked at driver console logs when ran in local[2] mode, checked yarn logs when ran in yarn-client mode.
What am I missing?
Instead of rdd.map, the following code prints well in spark driver console:
for(flight <- rdd.collect().toArray) {
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows)
}
But I'm afraid that processing on this flight object might happen in spark driver project, instead of executor. Please correct me if i'm wrong.
Thanks
Upvotes: 2
Views: 1004
Reputation: 37435
rdd.map
is a lazy transformation. It won't be materialized unless an action is called on that RDD.
In this specific case, we could use rdd.foreach
which is one of the most generic actions on RDD, giving us access to each element in the RDD.
flights.foreachRDD{ rdd =>
rdd.foreach { flight =>
val flightRows = FlightParser.parse(flight)
println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently
}
}
Given that this RDD action is executed in the executors, we will find the println output in the executor's STDOUT.
If you would like to print the data on the driver instead, you can collect
the data of the RDD within the DStream.foreachRDD
closure.
flights.foreachRDD{ rdd =>
val allFlights = rdd.collect()
println(allFlights.mkString("\n")) // prints to the stdout of the driver
}
Upvotes: 2