Reputation: 235
I am a beginner with Kafka and spark. I want to do real-time processing on data I am receiving from kafka on a specific topic through spark streaming. I am not able to use JavaPairReceiverInputDStream as returned by the createStream function .
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
"testwordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(1));
Map<String, Integer> topics_map = new HashMap<String, Integer>();
topics_map.put("Customtopic", 10);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(jssc, "localhost:2181", "kafkasparkconsumer",
topics_map);
The code below gives an error :
JavaPairDStream<String, Integer> wordCounts = kafkaStream.map(
new PairFunction<String, String, Integer>() {
@Override public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
The method map(Function,R>) in the type JavaPairDStream is not applicable for the arguments (new PairFunction(){}) SparkStreamingKafka.java /Kafka-Spark/src/com/sd/kafka line 43 Java Problem
The spark version I am using is 1.2.0. I couldn't find examples for java api for dealing with kafka messages. Can anyone tell me what do I need to change?
Upvotes: 3
Views: 1317
Reputation: 124
kafkaStream return a tuple. Check this
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils
.createStream(jssc, "localhost:2181", "kafkasparkconsumer",
topics_map);
JavaDStream<String> lines = kafkaStream
.map(new Function<Tuple2<String, String>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
Upvotes: 0
Reputation: 3354
You call the wrong method. In java if you want to get a Pair, you should call MapToPair. Try this code:
JavaPairDStream<String, Integer> pairs = kafkaStream
.mapToPair(new PairFunction<Tuple2<String, String>, String, Integer>() {
@Override public Tuple2<String, Integer> call(Tuple2<String, String> word) throws Exception {
return new Tuple2<String, Integer>(word._2(), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
pairs.print();
jssc.start();
jssc.awaitTermination();
Upvotes: 7