Reputation: 443
I have many text file in local directory. Spark Program to read all the files and store it into database. For the moment, trying to read the files using text file stream not working.
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
/**
* Main Program
*/
object SparkMain extends App {
// Create a SparkContext to initialize Spark
val sparkConf: SparkConf =
new SparkConf()
.setMaster("local")
.setAppName("TestProgram")
// Create a spark streaming context with windows period 2 sec
val ssc: StreamingContext =
new StreamingContext(sparkConf, Seconds(2))
// Create text file stream
val sourceDir: String = "D:\\tmpDir"
val stream: DStream[String] = ssc.textFileStream(sourceDir)
case class TextLine(line: String)
val lineRdd: DStream[TextLine] = stream.map(TextLine)
lineRdd.foreachRDD(rdd => {
rdd.foreach(println)
})
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
Input:
//1.txt
Hello World
Nothing print when stream the streaming. What is wrong in it?
Upvotes: 0
Views: 905
Reputation: 23099
TextFileStreaming
does not read the file that is already present in the directory. Start the program and create a new file or move the file from any other directory. The following program is simple word count for text file streaming
val sourceDir: String = "path to streaming directory"
val stream: DStream[String] = streamingContext.textFileStream(sourceDir)
case class TextLine(line: String)
val lineRdd: DStream[TextLine] = stream.map(TextLine)
lineRdd.foreachRDD(rdd => {
val words = rdd.flatMap(rdd => rdd.line.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
println("=====================")
wordCounts.foreach(println)
println("=====================" + rdd.count())
})
The ouput should be something like this
+++++++++++++++++++++++
=====================0
+++++++++++++++++++++++
(are,1)
(you,1)
(how,1)
(hello,1)
(doing,1)
=====================5
+++++++++++++++++++++++
=====================0
I hope this helps!
Upvotes: 1