Reputation: 124
I have to write a single output file for my streaming job.
Question : when will my job actually stop? I killed the server but did not work. I want to stop my job from commandline(If it is possible)
Code:
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import scala.collection.mutable.ArrayBuffer
object MAYUR_BELDAR_PROGRAM5_V1 {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(10))
val lines = ssc.socketTextStream("localhost", args(0).toInt)
val words = lines.flatMap(_.split(" "))
val class1 = words.filter(a => a.charAt(0).toInt%2==0).map(a => a).filter(a => a.length%2==0)
val class2 = words.filter(a => a.charAt(0).toInt%2==0).map(a => a).filter(a => a.length%2==1)
val class3 = words.filter(a => a.charAt(0).toInt%2==1).map(a => a).filter(a => a.length%2==0)
val class4 = words.filter(a => a.charAt(0).toInt%2==1).map(a => a).filter(a => a.length%2==1)
class1.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class1","txt")
class2.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class2", "txt")
class3.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class3","txt")
class4.saveAsTextFiles("hdfs://hadoop1:9000/mbeldar/class4","txt")
ssc.start() // Start the computation
ssc.awaitTermination()
ssc.stop()
}
}
Upvotes: 0
Views: 288
Reputation: 2261
A stream by definition does not have an end so it will not stop unless you call the method to stop it. In my case I have a business condition that tell when the process is finished, so when I reach this point I'm calling the method JavaStreamingContext.close(). I also have a monitor that checks if the process has not received any data in the past few minutes in which case it will also close the stream.
In order to accumulate data you have to use the method updateStateByKey (on a PairDStream). This method requires checkpointing to be enabled.
I have checked the Spark code and found that saveAsTextFiles uses foreachRDD, so at the end it will save each RDD separately, so previous RDDs will not be taken into account. Using updateStateByKey it will still save multiple files, but each file will consider all RDDs that were processed before.
Upvotes: 1