Reputation: 229
Appreciate If you could throw some light into this issue. I am writing a Spark stream receiver to consume messages from Kafka. I am getting a block of messages which is fine. I am splitting block of messages using new line separator to create new DSTream "msgLines" (code snippet is attached). Now I want to loop through "msgLines" DStream for each line to get that message record (line) to process it.
How to do that? Any sample code please?
Thanks much
JavaPairDStream<String, String> messages = KafkaUtils.createStream(sc, zkQuorum, group, topicMap);
//
JavaDStream<String> msgBlock = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
//
JavaDStream<String> msgLines = msgBlock.flatMap(new FlatMapFunction <String, String>(){
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split("\n"));
}
});
Upvotes: 0
Views: 1137
Reputation: 46
You can do something like this below. Here i am just printing the messages on console, instead you can store them in any external store.
msgLines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = -2185091572028321496L;
@Override
public void call(JavaRDD<String> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> msgs) throws Exception {
while(msgs.hasNext())
System.out.println(msgs.next());
}
});
}
});
Upvotes: 0