Rumesh Krishnan
Rumesh Krishnan

Reputation: 443

TextFileStreaming in spark scala

I have many text file in local directory. Spark Program to read all the files and store it into database. For the moment, trying to read the files using text file stream not working.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream

/**
  * Main Program
  */
object SparkMain extends App {

  // Create a SparkContext to initialize Spark
  val sparkConf: SparkConf =
    new SparkConf()
      .setMaster("local")
      .setAppName("TestProgram")

  // Create a spark streaming context with windows period 2 sec
  val ssc: StreamingContext =
    new StreamingContext(sparkConf, Seconds(2))

  // Create text file stream
  val sourceDir: String = "D:\\tmpDir"
  val stream: DStream[String] = ssc.textFileStream(sourceDir)

  case class TextLine(line: String)

  val lineRdd: DStream[TextLine] = stream.map(TextLine)

  lineRdd.foreachRDD(rdd => {
    rdd.foreach(println)
  })

  // Start the computation
  ssc.start()
  // Wait for the computation to terminate
  ssc.awaitTermination()
}

Input:

//1.txt
Hello World

Nothing print when stream the streaming. What is wrong in it?

Upvotes: 0

Views: 905

Answers (1)

koiralo
koiralo

Reputation: 23099

TextFileStreaming does not read the file that is already present in the directory. Start the program and create a new file or move the file from any other directory. The following program is simple word count for text file streaming

  val sourceDir: String = "path to streaming directory"
  val stream: DStream[String] = streamingContext.textFileStream(sourceDir)

  case class TextLine(line: String)

  val lineRdd: DStream[TextLine] = stream.map(TextLine)

  lineRdd.foreachRDD(rdd => {
    val words = rdd.flatMap(rdd => rdd.line.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    println("=====================")
    wordCounts.foreach(println)
    println("=====================" + rdd.count())
  })

The ouput should be something like this

+++++++++++++++++++++++
=====================0
+++++++++++++++++++++++
(are,1)
(you,1)
(how,1)
(hello,1)
(doing,1)
=====================5
+++++++++++++++++++++++
=====================0

I hope this helps!

Upvotes: 1

Related Questions