Srinivasarao Daruna
Srinivasarao Daruna

Reputation: 3374

How do i pass Spark context to a function from foreach

I need to pass SparkContext to my function and please suggest me how to do that for below scenario.

I have a Sequence, each element refers to specific data source from which we gets RDD and process them. I have defined a function which takes spark context and the data source and does the necessary things. I am curretly using while loop. But, i would like to do it with foreach or map, so that i can imply parallel processing. I need to spark context for the function, but how can i pass it from the foreach.?

Just a SAMPLE code, as i cannot present the actual code:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object RoughWork {
  def main(args: Array[String]) {

    val str = "Hello,hw:How,sr:are,ws:You,re";
    val conf = new SparkConf
    conf.setMaster("local");
    conf.setAppName("app1");
    val sc = new SparkContext(conf);
    val sqlContext = new SQLContext(sc);

    val rdd = sc.parallelize(str.split(":"))
    rdd.map(x => {println("==>"+x);passTest(sc, x)}).collect();

  }

  def passTest(context: SparkContext, input: String) {
    val rdd1 = context.parallelize(input.split(","));
    rdd1.foreach(println)
  }
}

Upvotes: 0

Views: 5854

Answers (1)

sgvd
sgvd

Reputation: 3939

You cannot pass the SparkContext around like that. passTest will be run on an/the executor(s), while the SparkContext runs on the driver.

If I would have to do a double split like that, one approach would be to use flatMap:

rdd
  .zipWithIndex
  .flatMap(l => {
    val parts = l._1.split(",");
    List.fill(parts.length)(l._2) zip parts})
  .countByKey

There may be prettier ways, but basically the idea is that you can use zipWithIndex to keep track which line an item came from and then use key-value pair RDD methods to work on your data.

If you have more than one key, or just more structured data in general, you can look into using Spark SQL with DataFrames (or DataSets in latest version) and explode instead of flatMap.

Upvotes: 2

Related Questions