user3699367
user3699367

Reputation:

Spark DirectStream Issue

I am trying to create a Spark Direct Stream from Kafka, but while creating the directStream object I am getting error as :

The method createDirectStream in the type kafkaUtils is not applicable for (one of the HashMap parameter that I am passing).

At this line: JavaPairInputDStream directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);

Full Code:

package kafkatest2;



import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.commons.codec.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.kafka010.*;
public class SparkStream {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        // TODO: processing pipeline
       Map<String,String> kafkaParams = new HashMap<String,String>();
       kafkaParams.put("metadata.broker.list", "localhost:9092");

        Set<String> topics = Collections.singleton("topic5");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class,
        String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);


        directKafkaStream.foreachRDD(rdd -> {
            System.out.println("--- New RDD with " + rdd.partitions().size()
                    + " partitions and " + rdd.count() + " records");
            rdd.foreach(record -> System.out.println(record._2));
        });
        ssc.start();
        ssc.awaitTermination();
    }
}

Upvotes: 1

Views: 563

Answers (1)

himanshuIIITian
himanshuIIITian

Reputation: 6085

In your code, wrong StringDecoder is being used. It should be kafka.serializer.StringDecoder instead of org.apache.commons.codec.StringDecoder.

The correct code is as follows:

package kafkatest2;



import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.kafka010.*;
public class SparkStream {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        // TODO: processing pipeline
       Map<String,String> kafkaParams = new HashMap<String,String>();
       kafkaParams.put("metadata.broker.list", "localhost:9092");

        Set<String> topics = Collections.singleton("topic5");

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class,
        String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);


        directKafkaStream.foreachRDD(rdd -> {
            System.out.println("--- New RDD with " + rdd.partitions().size()
                    + " partitions and " + rdd.count() + " records");
            rdd.foreach(record -> System.out.println(record._2));
        });
        ssc.start();
        ssc.awaitTermination();
    }
}

I hope it helps!

Upvotes: 0

Related Questions