Suri
Suri

Reputation: 229

Kafka --> Spark Stream - How to split message into records on the Spark Receiver?

Appreciate If you could throw some light into this issue. I am writing a Spark stream receiver to consume messages from Kafka. I am getting a block of messages which is fine. I am splitting block of messages using new line separator to create new DSTream "msgLines" (code snippet is attached). Now I want to loop through "msgLines" DStream for each line to get that message record (line) to process it.

How to do that? Any sample code please?

Thanks much

  JavaPairDStream<String, String> messages = KafkaUtils.createStream(sc, zkQuorum, group, topicMap);
    //
    JavaDStream<String> msgBlock = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });


    //
    JavaDStream<String> msgLines = msgBlock.flatMap(new FlatMapFunction <String, String>(){
        @Override
        public Iterable<String> call(String x) {
          return Lists.newArrayList(SPACE.split("\n"));
        }
    });

Upvotes: 0

Views: 1137

Answers (1)

SMN
SMN

Reputation: 46

You can do something like this below. Here i am just printing the messages on console, instead you can store them in any external store.

 msgLines.foreachRDD(new VoidFunction<JavaRDD<String>>() {

        private static final long serialVersionUID = -2185091572028321496L;

        @Override
        public void call(JavaRDD<String> rdd) throws Exception {

            rdd.foreachPartition(new VoidFunction<Iterator<String>>() {

                @Override
                public void call(Iterator<String> msgs) throws Exception {
                    while(msgs.hasNext())
                        System.out.println(msgs.next());
                }
            });
        }
    });

Upvotes: 0

Related Questions