Sporty
Sporty

Reputation: 761

How to run concurrent jobs(actions) in Apache Spark using single spark context


It says in Apache Spark documentation "within each Spark application, multiple “jobs” (Spark actions) may be running concurrently if they were submitted by different threads". Can someone explain how to achieve this concurrency for the following sample code?

    SparkConf conf = new SparkConf().setAppName("Simple_App");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
    JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");

    System.out.println(file1.count());
    System.out.println(file2.count());

These two jobs are independent and must run concurrently.
Thank You.

Upvotes: 22

Views: 22656

Answers (2)

Tagar
Tagar

Reputation: 14891

Using scala parallel collections feature

Range(0,10).par.foreach {
  project_id => 
      {
        spark.table("store_sales").selectExpr(project_id+" as project_id", "count(*) as cnt")
        .write
        .saveAsTable(s"counts_$project_id")
    }
}

PS. Above would launch up to 10 parallel Spark jobs but it could be less depending on number of available cores on Spark Driver. Above method by GQ using Futures is more flexible in this regard.

Upvotes: 0

G Quintana
G Quintana

Reputation: 4667

Try something like this:

    final JavaSparkContext sc = new JavaSparkContext("local[2]","Simple_App");
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    // Start thread 1
    Future<Long> future1 = executorService.submit(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            JavaRDD<String> file1 = sc.textFile("/path/to/test_doc1");
            return file1.count();
        }
    });
    // Start thread 2
    Future<Long> future2 = executorService.submit(new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            JavaRDD<String> file2 = sc.textFile("/path/to/test_doc2");
            return file2.count();
        }
    });
    // Wait thread 1
    System.out.println("File1:"+future1.get());
    // Wait thread 2
    System.out.println("File2:"+future2.get());

Upvotes: 24

Related Questions