Reputation: 331
This question might be duplicate.
I want to listen a Kafka Topic from Spark and Pass the content to Ignite Cache.
I like to achieve the same thing described at Performance Tuning of an Apache Kafka/Spark Streaming System.
Used KafkaUtils.createDirectStream()
for reading Kafka topic in Spark and IgniteRDD
for pushing data into Ignite cache.
But system threw error like below:
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
Code is mentioned below:
public static void main(String[] args) throws Exception{
SparkConf conf = new SparkConf()
conf.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
//Context for Kafka
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
// Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
IgniteContext igniteContext = new IgniteContext(,"/home/ec2-user/apache-ignite-fabric-2.6.0-bin/config/default-config.xml", false);
// Adjust the logger to exclude the logs of no interest.
// Define data to be stored in the Ignite RDD (cache).
List<Integer> data = new ArrayList<>(20);
for (int i = 0; i<20; i++) {
Set<String> topics = Collections.singleton("Hello-Kafka");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("", "");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
// Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
IgniteRDD sharedRDD = igniteContext.fromCache("hello-spark");
// Preparing a Java RDD
JavaRDD<String> javaRDD = sc.parallelize(Collections.singletonList("Hello-world"));
System.out.println("--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records");
rdd.foreach(record -> {
//Displaying Kafka topic
System.out.println("Got the record : " + record._2);
//Pushing valeus to Ignite
sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
return new Tuple2<Integer, Integer>(val, val);
I am not able to find out what is missing in code. Is the approach is right or I should use another approach. Please guide me for the same.
Upvotes: 0
Views: 403
Reputation: 3591
Your example may be reduced to the following code:
JavaRDD<Integer> rdd = sparkCtx.parallelize(Arrays.asList(1, 2, 3));
JavaRDD<Integer> javaRDD = sparkCtx.parallelize(Arrays.asList(4, 5, 6));
JavaIgniteRDD<Integer, Integer> sharedRDD = igniteCtx.fromCache("hello-spark");
rdd.foreach(record ->
javaRDD.mapToPair((PairFunction<Integer, Integer, Integer>)val ->
new Tuple2<>(val, val))
I removed Kafka out of the equation to simplify the example.
First of all, this is strange, that you iterate over elements of rdd
and put values of javaRDD
into sharedRDD
, while ignoring the rdd
records. Where rdd
and javaRDD
are different things. I don't get, why you do this.
You get the exception because you run mapToPair
operation inside the foreach
. Both of them are RDD operations, which cannot be nested. You should either move the savePairs
part out of the foreach
, or combine rdd
and javaRDD
in some way, that won't require running nested RDD operations. It depends on what you are really trying to achieve.
Upvotes: 1