DJElbow
DJElbow

Reputation: 3463

Spark - How to use SparkContext within classes?

I am building an application in Spark, and would like to use the SparkContext and/or SQLContext within methods in my classes, mostly to pull/generate data sets from files or SQL queries.

For example, I would like to create a T2P object which contains methods that gather data (and in this case need access to the SparkContext):

class T2P (mid: Int, sc: SparkContext, sqlContext: SQLContext) extends Serializable {

  def getImps(): DataFrame = {
      val imps = sc.textFile("file.txt").map(line => line.split("\t")).map(d => Data(d(0).toInt, d(1), d(2), d(3))).toDF()
      return imps
   }

  def getX(): DataFrame = {
      val x = sqlContext.sql("SELECT a,b,c FROM table")
      return x
  }
}

//creating the T2P object
class App {
    val conf = new SparkConf().setAppName("T2P App").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val t2p = new T2P(0, sc, sqlContext);
}

Passing the SparkContext as an argument to the T2P class doesn't work since the SparkContext is not serializable (getting a task not serializable error when creating T2P objects). What is the best way to use the SparkContext/SQLContext inside my classes? Or perhaps is this the wrong way to design a data pull type process in Spark?

UPDATE Realized from the comments on this post that the SparkContext was not the problem, but that I was using a using a method within a 'map' function, causing Spark to try to serialize the entire class. This would cause the error since SparkContext is not serializable.

def startMetricTo(userData: ((Int, String), List[(Int, String)]), startMetric: String) : T2PUser = {
  //do something 
}

def buildUserRollup() = {
  this.userRollup = this.userSorted.map(line=>startMetricTo(line, this.startMetric))
}

This results in a 'task not serializable' exception.

Upvotes: 2

Views: 5603

Answers (2)

Yusuf Arif
Yusuf Arif

Reputation: 21

I tried several options, this is what worked eventually for me..

   object SomeName extends App {

   val conf = new SparkConf()...
   val sc = new SparkContext(conf)

   implicit val sqlC = SQLContext.getOrCreate(sc)
   getDF1(sqlC)

   def getDF1(sqlCo: SQLContext): Unit = {
    val query1 =  SomeQuery here  
    val df1 = sqlCo.read.format("jdbc").options(Map("url" -> dbUrl,"dbtable" -> query1)).load.cache()

     //iterate through df1 and retrieve the 2nd DataFrame based on some values in the Row of the first DataFrame

    df1.foreach(x => {
     getDF2(x.getString(0), x.getDecimal(1).toString, x.getDecimal(3).doubleValue) (sqlCo)
   })     
  }

  def getDF2(a: String, b: String, c: Double)(implicit sqlCont: SQLContext) :  Unit = {
     val query2 = Somequery

     val sqlcc = SQLContext.getOrCreate(sc)
    //val sqlcc = sqlCont //Did not work for me. Also, omitting (implicit sqlCont: SQLContext) altogether did not work
     val df2 = sqlcc.read.format("jdbc").options(Map("url" -> dbURL, "dbtable" -> query2)).load().cache()
       .
       .
       .
     }
  }

Note: In the above code, if I omitted (implicit sqlCont: SQLContext) parameter from getDF2 method signature, it would not work. I tried several other options of passing the sqlContext from one method to the other, it always gave me NullPointerException or Task not serializable Excpetion. Good thins is it eventually worked this way, and I could retrieve parameters from a row of the DataFrame1 and use those values in loading the DataFrame 2.

Upvotes: 0

DJElbow
DJElbow

Reputation: 3463

I fixed this problem (with the help of the commenters and other StackOverflow users) by creating a separate MetricCalc object to store my startMetricTo() method. Then I changed the buildUserRollup() method to use this new startMetricTo(). This allows the entire MetricCalc object to be serialized without issue.

//newly created object
object MetricCalc {
    def startMetricTo(userData: ((Int, String), List[(Int, String)]), startMetric: String) : T2PUser = {
    //do something
  }
}

//using function in T2P
def buildUserRollup(startMetric: String) = {
   this.userRollup = this.userSorted.map(line=>MetricCalc.startMetricTo(line, startMetric))
}

Upvotes: 1

Related Questions