Knows Not Much
Knows Not Much

Reputation: 31546

Using Future inside of a spark job

I want to perform 2 operations on a single RDD concurrently. I have written code like this

val conf = new SparkConf().setAppName("Foo")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
val inputPath = path
val rdd = sc.textFile(inputPath).cache()

val f1 = Future {
  val schama1 = StructType(List(StructField("a", StringType, true), StructField("b", StringType, true), StructField("c", LongType, true)))
  val rdd1 = rdd.map(func1).filter(_.isDefined).flatMap(x => x)
  val df1 = sqlSc.createDataFrame(rdd, schema)
  formSubmissionDataFrame.save("/foo/", "com.databricks.spark.avro")
  0
}

val f2 = Future {
  val schema2 = StructType(List(StructField("d", StringType, true), StructField("e", StringType, true)))
  val rdd2 = rdd.map(func2).filter(_.isDefined).flatMap(x => x)
  val df2 = sqlSc.createDataFrame(rdd2, schema2)
  pageViewDataFrame.save("/bar/", "com.databricks.spark.avro")
  0
}

val result = for {
  r1 <- f1
  r2 <- f2
} yield(r1 + r2)

result onSuccess{
  case r => println("done")
}

Await.result(result, Duration.Inf)

When I am running this code, I don't see the desired effect. the directory bar has lots of temporary files etc... but foo has nothing... so it seems the two datasets are not being created in parallel.

Is it a good idea to use a future inside the spark driver? am I doing it correctly? should I do anything differently.

Upvotes: 3

Views: 2273

Answers (1)

Anish
Anish

Reputation: 71

For executing two or more Spark JOBS (actions) in parallel, the Spark Context needs to be running in FAIR scheduler mode.

In the driver program for all transformation only the dependency graph is generated for execution however the actual execution happens only when an action is called. Typically the driver waits as the execution happens across nodes managed by Spark slaves. In your case the Spark Master doesn't start executing the second job till the first one is over, because by default Spark Scheduling is FIFO.

You can set the conf as follows to enable parallel execution

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

For details visit Spark Scheduling within an application

Upvotes: 1

Related Questions