Shashank S
Shashank S

Reputation: 161

dstream parse JSON and save to textFile : SparkStreaming

i have a Kakfa topic in which data is stored in a JSON format. I have written a spark streaming code and I want to save just the values from the Kafka topic to a file in HDFS .

This is how the data in my kafka topic looks like :

{"group_city":"\"Washington\"","group_country":"\"us\"","event_name":"\"Outdoor Afro Goes Ziplining\""}

Below, is the code I have written. When i print it, I get the parsed JSON, but my problem comes when i try to save just the values to text file.

val dstream = KafkaUtils.createDirectStream[String, String](ssc,preferredHosts,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

//___PRINTING RECORDS________
val output= dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val values = record.value()
    val tweet = scala.util.parsing.json.JSON.parseFull(values)
    val map:Map[String,String] = tweet.get.asInstanceOf[Map[String, String]]
    map.foreach(p => println(p._2))
  }
}

Upvotes: 1

Views: 1390

Answers (1)

koiralo
koiralo

Reputation: 23109

You can save the rdd with saveAsTextFile, But since you only want to save the values you can convert to dataframe and write as a csv

dstream.foreachRDD(rawRDD => {

  // get the data 
  val rdd = rawRDD.map(_._2)

  rdd.saveAsTextFile("file path")

  //      or read the json String to dataframe and write as a csv

  spark.read.json(rdd).write.mode(SaveMode.Append).csv("path for output")
})

Hope this helps!

Upvotes: 1

Related Questions