Reputation: 31
I am creating a spark scala code in which I am reading a continuous stream from MQTT server. I am running my job in yarn cluster mode. I want to save and append this stream in a single text file in HDFS.
I will be receiving stream of data after every 1 second. So I need this data to be appended in single text file in HDFS.
Can any one help.
Upvotes: 3
Views: 10731
Reputation: 293
@Amrutha J Raj
rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("C:/data/spark/")
This means, RDD
will convert to DF
and we have used coalesce(1)
so it will have only one file if you wont use that then spark may generate multiple files so with this it will restrict to only one and our write mode is Append so it will be appending to the existing file and inn json format.
Upvotes: 1
Reputation: 1636
Use data frame and use mode Append This will append the data every time new record comes.
val sqlContext = new org.apache.spark.sql.SQLContext(context)
import sqlContext.implicits._
stream.map(_.value).foreachRDD(rdd => {
rdd.foreach(println)
if (!rdd.isEmpty()) {
rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).save("C:/data/spark/")
// rdd.saveAsTextFile("C:/data/spark/")
}
})
Upvotes: 2