Reputation: 51
I have created a hiveContext
in main()
function in Scala and I need to pass through parameters this hiveContext
to other functions, this is the structure:
object Project {
def main(name: String): Int = {
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
...
}
def read (streamId: Int, hc:hiveContext): Array[Byte] = {
...
}
def close (): Unit = {
...
}
}
but it doesn't work. Function read()
is called inside main()
.
any idea?
Upvotes: 3
Views: 1442
Reputation: 21
I tried several options, this is what worked eventually for me..
object SomeName extends App {
val conf = new SparkConf()...
val sc = new SparkContext(conf)
implicit val sqlC = SQLContext.getOrCreate(sc)
getDF1(sqlC)
def getDF1(sqlCo: SQLContext): Unit = {
val query1 = SomeQuery here
val df1 = sqlCo.read.format("jdbc").options(Map("url" -> dbUrl,"dbtable" -> query1)).load.cache()
//iterate through df1 and retrieve the 2nd DataFrame based on some values in the Row of the first DataFrame
df1.foreach(x => {
getDF2(x.getString(0), x.getDecimal(1).toString, x.getDecimal(3).doubleValue) (sqlCo)
})
}
def getDF2(a: String, b: String, c: Double)(implicit sqlCont: SQLContext) : Unit = {
val query2 = Somequery
val sqlcc = SQLContext.getOrCreate(sc)
//val sqlcc = sqlCont //Did not work for me. Also, omitting (implicit sqlCont: SQLContext) altogether did not work
val df2 = sqlcc.read.format("jdbc").options(Map("url" -> dbURL, "dbtable" -> query2)).load().cache()
.
.
.
}
}
Note: In the above code, if I omitted (implicit sqlCont: SQLContext) parameter from getDF2 method signature, it would not work. I tried several other options of passing the sqlContext from one method to the other, it always gave me NullPointerException or Task not serializable Excpetion. Good thins is it eventually worked this way, and I could retrieve parameters from a row of the DataFrame1 and use those values in loading the DataFrame 2.
Upvotes: 1
Reputation: 7926
I'm declaring hiveContext as implicit, this is working for me
implicit val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)
Defined in MyJob:
override def run(config: Config)(implicit sqlContext: SQLContext): Unit = ...
But if you don't want it implicit, this should be the same
val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)(sqlContext)
override def run(config: Config)(sqlContext: SQLContext): Unit = ...
Also, your function read should receive HiveContext as the type for the parameter hc, and not hiveContext
def read (streamId: Int, hc:HiveContext): Array[Byte] =
Upvotes: 2