samrat
samrat

Reputation: 13

Spark Streaming Not reading all Kafka Records

We are sending 15 records from kafka to SparkStreaming, but spark is receiving only 11 records. I am using spark 2.1.0 and kafka_2.12-0.10.2.0.

CODE

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaToSparkData {
 public static void main(String[] args) throws InterruptedException {
    int timeDuration = 100;
    int consumerNumberOfThreads = 1;
    String consumerTopic = "InputDataTopic";
    String zookeeperUrl = "localhost:2181";
    String consumerTopicGroup =  "testgroup";
    String producerKafkaUrl = "localhost:9092";
    String producerTopic =  "OutputDataTopic";
    String sparkMasterUrl = "local[2]";

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(consumerTopic, consumerNumberOfThreads);

    SparkSession sparkSession = SparkSession.builder().master(sparkMasterUrl).appName("Kafka-Spark").getOrCreate();

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, new Duration(timeDuration));

    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zookeeperUrl, consumerTopicGroup, topicMap);

    JavaDStream<String> NewRecord = messages.map(new Function<Tuple2<String, String>, String>() {
        private static final long serialVersionUID = 1L;

        public String call(Tuple2<String, String> line) throws Exception {

            String responseToKafka = "";
            System.out.println(" Data IS " + line);

            String ValueData = line._2;
            responseToKafka = ValueData + "|" + "0";

            Properties configProperties = new Properties();
            configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerKafkaUrl);
            configProperties.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
            configProperties.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);

            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);

            ProducerRecord<String, String> topicMessage = new ProducerRecord<String, String>(producerTopic,responseToKafka);
            producer.send(topicMessage);
            producer.close();

            return responseToKafka;
        }
    });

    System.out.println(" Printing Record" );
    NewRecord.print();

    javaStreamingContext.start();
    javaStreamingContext.awaitTermination();
    javaStreamingContext.close();

    }
}
Kafka Producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic InputDataTopic # 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

Kafka Consumer

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic OutputDataTopic --from-beginning # 1|0 2|0 3|0 4|0 5|0 6|0 7|0 8|0 9|0 10|0 11|0

Could somebody help me on this?

Upvotes: 0

Views: 1387

Answers (1)

maasg
maasg

Reputation: 37435

What we see here is an effect of how lazy operations work in Spark. Here, we are using a map operation to cause a side-effect, namely, to send some data to Kafka.

The stream is then materialized using print. By default, print will show the first 10 elements of the stream, but takes n+1 elements in order to show a "..." to indicate when there are more.

This take(11) forces the materialization of the first 11 elements, so they are taken from the original stream and processed with the map function. This results in a partial publishing to Kafka.

How to solve this? Well, the hint is already above: DO NOT USE side-effects in a map function. In this case, the correct output operation to consume the stream and send it to Kafka should be foreachRDD.

Furthermore, to avoid creating a Kafka producer instance for each element, we process the internal RDD using foreachPartition.

A code skeleton of this process looks like this:

messages.foreachRDD{rdd => 
  rdd.foreachPartition{partitionIter => 
       producer = // create producer
       partitionIter.foreach{elem =>
           record = createRecord(elem)
           producer.send(record)
       }
       producer.flush()  
       producer.close()
   }
}    

Upvotes: 2

Related Questions