Reputation: 3463
I am building an application in Spark, and would like to use the SparkContext and/or SQLContext within methods in my classes, mostly to pull/generate data sets from files or SQL queries.
For example, I would like to create a T2P object which contains methods that gather data (and in this case need access to the SparkContext):
class T2P (mid: Int, sc: SparkContext, sqlContext: SQLContext) extends Serializable {
def getImps(): DataFrame = {
val imps = sc.textFile("file.txt").map(line => line.split("\t")).map(d => Data(d(0).toInt, d(1), d(2), d(3))).toDF()
return imps
}
def getX(): DataFrame = {
val x = sqlContext.sql("SELECT a,b,c FROM table")
return x
}
}
//creating the T2P object
class App {
val conf = new SparkConf().setAppName("T2P App").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val t2p = new T2P(0, sc, sqlContext);
}
Passing the SparkContext as an argument to the T2P class doesn't work since the SparkContext is not serializable (getting a task not serializable
error when creating T2P objects). What is the best way to use the SparkContext/SQLContext inside my classes? Or perhaps is this the wrong way to design a data pull type process in Spark?
UPDATE Realized from the comments on this post that the SparkContext was not the problem, but that I was using a using a method within a 'map' function, causing Spark to try to serialize the entire class. This would cause the error since SparkContext is not serializable.
def startMetricTo(userData: ((Int, String), List[(Int, String)]), startMetric: String) : T2PUser = {
//do something
}
def buildUserRollup() = {
this.userRollup = this.userSorted.map(line=>startMetricTo(line, this.startMetric))
}
This results in a 'task not serializable' exception.
Upvotes: 2
Views: 5603
Reputation: 21
I tried several options, this is what worked eventually for me..
object SomeName extends App {
val conf = new SparkConf()...
val sc = new SparkContext(conf)
implicit val sqlC = SQLContext.getOrCreate(sc)
getDF1(sqlC)
def getDF1(sqlCo: SQLContext): Unit = {
val query1 = SomeQuery here
val df1 = sqlCo.read.format("jdbc").options(Map("url" -> dbUrl,"dbtable" -> query1)).load.cache()
//iterate through df1 and retrieve the 2nd DataFrame based on some values in the Row of the first DataFrame
df1.foreach(x => {
getDF2(x.getString(0), x.getDecimal(1).toString, x.getDecimal(3).doubleValue) (sqlCo)
})
}
def getDF2(a: String, b: String, c: Double)(implicit sqlCont: SQLContext) : Unit = {
val query2 = Somequery
val sqlcc = SQLContext.getOrCreate(sc)
//val sqlcc = sqlCont //Did not work for me. Also, omitting (implicit sqlCont: SQLContext) altogether did not work
val df2 = sqlcc.read.format("jdbc").options(Map("url" -> dbURL, "dbtable" -> query2)).load().cache()
.
.
.
}
}
Note: In the above code, if I omitted (implicit sqlCont: SQLContext) parameter from getDF2 method signature, it would not work. I tried several other options of passing the sqlContext from one method to the other, it always gave me NullPointerException or Task not serializable Excpetion. Good thins is it eventually worked this way, and I could retrieve parameters from a row of the DataFrame1 and use those values in loading the DataFrame 2.
Upvotes: 0
Reputation: 3463
I fixed this problem (with the help of the commenters and other StackOverflow users) by creating a separate MetricCalc
object to store my startMetricTo() method. Then I changed the buildUserRollup() method to use this new startMetricTo(). This allows the entire MetricCalc
object to be serialized without issue.
//newly created object
object MetricCalc {
def startMetricTo(userData: ((Int, String), List[(Int, String)]), startMetric: String) : T2PUser = {
//do something
}
}
//using function in T2P
def buildUserRollup(startMetric: String) = {
this.userRollup = this.userSorted.map(line=>MetricCalc.startMetricTo(line, startMetric))
}
Upvotes: 1