Reputation: 577
i am very new to Spark streaming and i am implementing small exercise like sending XML data from kafka and need to receive that streaming data through spark streaming. I tried in all possible ways.. but every time i am getting empty values.
There is no problem in Kafka side, only problem is receiving the Streaming data from Spark side.
Here is the code how i am implementing:
package com.package;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class SparkStringConsumer {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("mytopic");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
ssc.start();
ssc.awaitTermination();
}
}
And i am using following versions:
**Zookeeper 3.4.6
Scala 2.11
Spark 2.0
Kafka 0.8.2**
Upvotes: 1
Views: 1714
Reputation: 1775
You can like this:
directKafkaStream.foreachRDD(rdd ->{
rdd.foreachPartition(item ->{
while (item.hasNext()) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>"+item.next());
}
}
});
itme.next() contains key value pairs. and you can get values by using item.next()._2
Upvotes: 1
Reputation: 15317
Your spark streaming application looks ok. I tested it and it is printing kafka messages. You can also try below "Message Received" print statement to verify the kafka messages.
directKafkaStream.foreachRDD(rdd -> {
System.out.println("Message Received "+rdd.values().take(5));
System.out.println("--- New RDD with " + rdd.partitions().size()
+ " partitions and " + rdd.count() + " records");
rdd.foreach(record -> System.out.println(record._2));
});
If you are using Zookeeper then set that as well to kafka param
kafkaParams.put("zookeeper.connect","localhost:2181");
Following import statements I am not seeing in your program so adding here.
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
Please also verify if you can consume messages on topic "mytopic" using command line kafka-console-consumer.
Upvotes: 0