Reputation: 654
I have streamed data from Kafka topics using Spark. This is the code that I have tried. Here I was just displaying the streaming data in console. I want to store this data as a text file in HDFS.
import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
object StreamingDataNew {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaConf = Map(
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "kafka-streaming-example",
"zookeeper.connection.timeout.ms" -> "200000"
)
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaConf,
Map("topic-one" -> 1), // subscripe to topic and partition 1
StorageLevel.MEMORY_ONLY
)
println("printing" + lines.toString())
val words = lines.flatMap { case (x, y) => y.split(" ") }
words.print()
ssc.start()
ssc.awaitTermination()
}
}
I found that we can write the DStream using 'saveAsTextFiles'.But can someone clearly mention the steps on how to connect with Hortonworks and store in HDFS using the above scala code.
Upvotes: 2
Views: 1647
Reputation: 654
I found the answer, This code worked out for me.
package com.spark.streaming
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object MessageStreaming {
def main(args: Array[String]): Unit = {
println("Message streaming")
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
val context = new SparkContext(conf)
val ssc = new StreamingContext(context, org.apache.spark.streaming.Seconds(10))
val kafkaParams = Map(
"bootstrap.servers" -> "kafka.kafka-cluster.com:9092",
"group.id" -> "kafka-streaming-example",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"zookeeper.connection.timeout.ms" -> "200000"
)
val topics = Array("cdc-classic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val content = stream.filter(x => x.value() != null)
val sqlContext = new org.apache.spark.sql.SQLContext(context)
import sqlContext.implicits._
stream.map(_.value).foreachRDD(rdd => {
rdd.foreach(println)
if (!rdd.isEmpty()) {
rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("hdfs://dev1a/user/hg5tv0/hadoop/MessagesFromKafka")
}
})
ssc.start()
ssc.awaitTermination
}}
Upvotes: 4