Reputation: 1209
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
My HDFS directory contains json files
Upvotes: 1
Views: 4156
Reputation: 23119
You can use textFileStream
to read it as a text file and convert it later.
val dstream = ssc.textFileStream("path to hdfs directory")
This gives you DStream[Strings]
which is a collection of RDD[String]
Then you can get an RDD for each interval of time as
dstream.foreachRDD(rdd => {
//now apply a transformation or anything with the each rdd
spark.read.json(rdd) // to change it to dataframe
})
scc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
Hope this helps
Upvotes: 2