Reputation: 4343
It's easy and simple in the toy examples for showing how to program in spark. You just import, create, use and discard, all in one little function.
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
def main(args: String) {
val conf = new SparkConf().setAppName("example")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
import hiveContext.sql
// load data from hdfs
val df1 = sqlContext.textFile("hdfs://.../myfile.csv").map(...)
val df1B = sc.broadcast(df1)
// load data from hive
val df2 = sql("select * from mytable")
// transform df2 with df1B
val cleanCol = udf(cleanMyCol(df1B)).apply("myCol")
val df2_new = df2.withColumn("myCol", cleanCol)
...
sc.stop()
}
In the real world, I find myself writing quite a few functions to modularize the tasks. For example, I would have a few functions just to load the different data tables. And in these load functions I would call other functions to do necessary data cleaning/transformation as I load the data. Then I would pass the contexts like so:
def loadHdfsFileAndBroadcast(sc: SparkContext) = {
// use sc here
val df = sc.textFile("hdfs://.../myfile.csv").map(...)
val dfB = sc.broadcast(df)
dfB
}
def loadHiveTable(hiveContext: HiveContext, df1B: Broadcast[Map[String, String]]) = {
import hiveContext.implicits._
val data = hiveContext.sql("select * from myHiveTable")
// data cleaning
val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
df_cleaned = data.withColumn("myCol", cleanCol)
df_cleaned
}
As you can see, the load function signatures get heavy quite easily.
I've tried to put these context imports outside the main function inside the class. But that causes problems (see this issue), which leaves me no option but to pass them around.
Is this the way to go or is there a better way to do this?
Upvotes: 8
Views: 1719
Reputation: 30310
First, let me say I'm glad that someone is exploring writing clean code in Spark. That is something I always find critical, but it always seems like people are so focused on the analytics themselves they lose sight of maintainability.
I do also agree Spark produces interesting challenges in that regard. The best way I've found, and of course you might feel this isn't an improvement, is to use traits with abstract method definitions and mix those into the object that orchestrates everything.
For example:
trait UsingSparkContextTrait {
def sc: SparkContext
def loadHdfsFileAndBroadcast = {
val df = sc.textFile("hdfs://.../myfile.csv").map(...)
sc.broadcast(df)
}
}
trait UsingHiveContextTrait {
def hiveContext: HiveContext
def df1B: Broadcast[Map[String, String]]
def loadHiveTable = {
val data = hiveContext.sql("select * from myHiveTable")
val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
data.withColumn("myCol", cleanCol)
}
}
And then finally:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
class ClassDoingWork extends UsingSparkContextTrait with UsingHiveContextTrait {
val conf = new SparkConf().setAppName("example")
val sc = new SparkContext(conf) //Satisfies UsingSparkContextTrait
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc) //Satisfies UsingHiveContextTrait
val dfb = loadHdfsFileAndBroadcast //Satisfies UsingHiveContextTrait
import hiveContext.implicits._
import hiveContext.sql
def doAnalytics = {
val dfCleaned = loadHiveTable
...
}
}
The cool thing about this dependency injection-ish approach is that you will know at compile-time if you are missing any of the components you need for your code to execute.
Finally, on a much simpler note, you can also access the SparkContext
from an RDD
instance with rdd.context
. That could prove useful too.
Upvotes: 8
Reputation: 4125
If all of your methods are defined in a single object/class, you could make the contexts belong to the object/class and always reference the global instance. If you provide it in the constructor you can even safely import only once and have access to the methods everywhere in your class/object.
For instance, if contexts are defined implicitly in calling object
object testObject {
def main(args: Array[String]): Unit = {
val sconf = new SparkConf().setMaster("local[2]").setAppName("testObj")
val rootLogger = Logger.getRootLogger
rootLogger.setLevel(Level.ERROR)
implicit val sc = new SparkContext(sconf)
implicit val sqlContext = new SQLContext(sc)
new foo().run()
}
}
you can use them below in the class that actually holds your logic
case class OneVal(value: String)
class foo(implicit val sc: SparkContext, implicit val sqlC: SQLContext){
import sqlC.implicits._
def run(): Unit ={
doStuff().show(1)
doOtherStuff().show(1)
}
def doStuff(): DataFrame ={
sc.parallelize(List(OneVal("test"))).toDF()
}
def doOtherStuff(): DataFrame ={
sc.parallelize(List(OneVal("differentTest"))).toDF()
}
}
In this example SQLContext.toDF is the implicit method in this case.
If run, this gives below output as expected
+-----+
|value|
+-----+
| test|
+-----+
+-------------+
| value|
+-------------+
|differentTest|
+-------------+
Upvotes: 1