user6325753
user6325753

Reputation: 577

Getting empty values while receiving from kafka Spark streaming

i am very new to Spark streaming and i am implementing small exercise like sending XML data from kafka and need to receive that streaming data through spark streaming. I tried in all possible ways.. but every time i am getting empty values.

There is no problem in Kafka side, only problem is receiving the Streaming data from Spark side.

Here is the code how i am implementing:

package com.package;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStringConsumer {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "localhost:9092");
        Set<String> topics = Collections.singleton("mytopic");

        JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
        String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
        directKafkaStream.foreachRDD(rdd -> {
        System.out.println("--- New RDD with " + rdd.partitions().size()
            + " partitions and " + rdd.count() + " records");
        rdd.foreach(record -> System.out.println(record._2));
        });


        ssc.start();
        ssc.awaitTermination();
    }
}

And i am using following versions:

**Zookeeper 3.4.6

Scala 2.11

Spark 2.0

Kafka 0.8.2**

Upvotes: 1

Views: 1714

Answers (2)

ROOT
ROOT

Reputation: 1775

You can like this:

directKafkaStream.foreachRDD(rdd ->{            
            rdd.foreachPartition(item ->{
                while (item.hasNext()) {    
                    System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>"+item.next());
}
}
});

itme.next() contains key value pairs. and you can get values by using item.next()._2

Upvotes: 1

abaghel
abaghel

Reputation: 15317

Your spark streaming application looks ok. I tested it and it is printing kafka messages. You can also try below "Message Received" print statement to verify the kafka messages.

    directKafkaStream.foreachRDD(rdd -> {
    System.out.println("Message Received "+rdd.values().take(5));
    System.out.println("--- New RDD with " + rdd.partitions().size()
        + " partitions and " + rdd.count() + " records");
    rdd.foreach(record -> System.out.println(record._2));
    });

If you are using Zookeeper then set that as well to kafka param

kafkaParams.put("zookeeper.connect","localhost:2181");

Following import statements I am not seeing in your program so adding here.

import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;

Please also verify if you can consume messages on topic "mytopic" using command line kafka-console-consumer.

Upvotes: 0

Related Questions