Saurabh Mishra
Saurabh Mishra

Reputation: 1713

SparkContext parallelize invocation example in java

Am getting started with Spark, and ran into issue trying to implement the simple example for map function. The issue is with the definition of 'parallelize' in the new version of Spark. Can someone share example of how to use it, since the following way is giving error for insufficient arguments.

Spark Version : 2.3.2 Java : 1.8

SparkSession session = SparkSession.builder().appName("Compute Square of Numbers").config("spark.master","local").getOrCreate();
SparkContext context = session.sparkContext();
List<Integer> seqNumList = IntStream.rangeClosed(10, 20).boxed().collect(Collectors.toList());
JavaRDD<Integer> numRDD = context.parallelize(seqNumList, 2);

Compiletime Error Message : The method expects 3 arguments

I do not get what the 3rd argument should be like? As per the documentation, it's supposed to be

scala.reflect.ClassTag<T>

But how to even define or use it?

Please do not suggest using JavaSparkContext, as i wanted to know how to get this approach to work with using generic SparkContext.

Ref : https://spark.apache.org/docs/2.2.1/api/java/org/apache/spark/SparkContext.html#parallelize-scala.collection.Seq-int-scala.reflect.ClassTag-

Upvotes: 3

Views: 4158

Answers (2)

Sapience
Sapience

Reputation: 1638

If you don't want to deal with providing the extra two parameters using sparkConext, you can also use JavaSparkContext.parallelize(), which only needs an input list:

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
final RDD<Integer> rdd = jsc.parallelize(seqNumList).map(num -> {
    // your implementation
}).rdd();

Upvotes: 1

Saurabh Mishra
Saurabh Mishra

Reputation: 1713

Here is the code which worked for me finally. Not the best way to achieve the result, but was a way to explore the API for me

SparkSession session = SparkSession.builder().appName("Compute Square of Numbers") .config("spark.master", "local").getOrCreate();

SparkContext context = session.sparkContext();

List<Integer> seqNumList = IntStream.rangeClosed(10, 20).boxed().collect(Collectors.toList());


RDD<Integer> numRDD = context
        .parallelize(JavaConverters.asScalaIteratorConverter(seqNumList.iterator()).asScala()
                .toSeq(), 2, scala.reflect.ClassTag$.MODULE$.apply(Integer.class));


numRDD.toJavaRDD().foreach(x -> System.out.println(x));
session.stop();

Upvotes: 3

Related Questions