Reputation: 7563
How do I see if SparkContext has contents executing and when everything finish I stop it? Because currently I am waiting 30 seconds before to call SparkContext.stop, otherwise my app throws an error.
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
object RatingsCounter extends App {
// set the log level to print only errors
Logger.getLogger("org").setLevel(Level.ERROR)
// create a SparkContext using every core of the local machine, named RatingsCounter
val sc = new SparkContext("local[*]", "RatingsCounter")
// load up each line of the ratings data into an RDD (Resilient Distributed Dataset)
val lines = sc.textFile("src/main/resource/u.data", 0)
// convert each line to s string, split it out by tabs and extract the third field.
// The file format is userID, movieID, rating, timestamp
val ratings = lines.map(x => x.toString().split("\t")(2))
// count up how many times each value occurs
val results = ratings.countByValue()
// sort the resulting map of (rating, count) tuples
val sortedResults = results.toSeq.sortBy(_._1)
// print each result on its own line.
sortedResults.foreach { case (key, value) => println("movie ID: " + key + " - rating times: " + value) }
Thread.sleep(30000)
sc.stop()
}
Upvotes: 2
Views: 2246
Reputation: 40370
Spark applications should define a main()
method instead of extending scala.App
. Subclasses of scala.App
may not work correctly.
And since you are extending App, you are getting an unexpected behavior.
You can read more about it in the official documentation about Self Contained Applications.
App
uses DelayedInit
and can cause initialization issues. With the main method you know what's going on. Excerpt from reddit.
object HelloWorld extends App {
var a = 1
a + 1
override def main(args: Array[String]) {
println(a) // guess what's the value of a ?
}
}
Upvotes: 6