Reputation: 387
I have a Kafka Producer which reads from a directory and writes the contents of the files to a topic
def main(args: Array[String]) {
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = Array("quickstart.cloudera:9092", "test","10","10")
val directoryPath = "/home/cloudera/Documents/config/"
// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val myDirectory= new File(directoryPath)
var lines =""
for (file <- myDirectory.listFiles) {
lines = scala.io.Source.fromFile(file).mkString
val message = new ProducerRecord[String, String](topic, null, lines)
producer.send(message)
print(lines)
Thread.sleep(1000)
}
Similarly I am using spark Direct streaming as my consumer
val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
val str = lines.print(10)
I am able to print the content of the file. I am using single topic. I have to Fetch the RDD from this DStream and take the entire content into a string object so that i can pass it to a method. Can someone help ?
Upvotes: 1
Views: 5790
Reputation: 741
The API you are looking for is:
DStream.foreachRDD(func)
It applies a function, func, to each RDD generated from the stream. So, for your use case, I would probably write the following code:
lines.foreachRDD(rdd => {
val data = rdd.collect().mkString("\n")
println(data)
})
Please note that since this code runs on the driver process, you have to make sure it has enough resources to process the given file. Usually, one should use this API to push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database.
You can further read about other output operations of DStreams on the Spark's programming guide.
Upvotes: 5