Reputation: 13
We are sending 15 records from kafka to SparkStreaming, but spark is receiving only 11 records. I am using spark 2.1.0 and kafka_2.12-0.10.2.0.
CODE
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaToSparkData {
public static void main(String[] args) throws InterruptedException {
int timeDuration = 100;
int consumerNumberOfThreads = 1;
String consumerTopic = "InputDataTopic";
String zookeeperUrl = "localhost:2181";
String consumerTopicGroup = "testgroup";
String producerKafkaUrl = "localhost:9092";
String producerTopic = "OutputDataTopic";
String sparkMasterUrl = "local[2]";
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(consumerTopic, consumerNumberOfThreads);
SparkSession sparkSession = SparkSession.builder().master(sparkMasterUrl).appName("Kafka-Spark").getOrCreate();
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, new Duration(timeDuration));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zookeeperUrl, consumerTopicGroup, topicMap);
JavaDStream<String> NewRecord = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, String> line) throws Exception {
String responseToKafka = "";
System.out.println(" Data IS " + line);
String ValueData = line._2;
responseToKafka = ValueData + "|" + "0";
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerKafkaUrl);
configProperties.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
configProperties.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
ProducerRecord<String, String> topicMessage = new ProducerRecord<String, String>(producerTopic,responseToKafka);
producer.send(topicMessage);
producer.close();
return responseToKafka;
}
});
System.out.println(" Printing Record" );
NewRecord.print();
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
javaStreamingContext.close();
}
}
Kafka Producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic InputDataTopic # 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
Kafka Consumerbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic OutputDataTopic --from-beginning # 1|0 2|0 3|0 4|0 5|0 6|0 7|0 8|0 9|0 10|0 11|0
Could somebody help me on this?
Upvotes: 0
Views: 1387
Reputation: 37435
What we see here is an effect of how lazy operations work in Spark.
Here, we are using a map
operation to cause a side-effect, namely, to send some data to Kafka.
The stream is then materialized using print
. By default, print
will show the first 10 elements of the stream, but takes n+1
elements in order to show a "..." to indicate when there are more.
This take(11)
forces the materialization of the first 11 elements, so they are taken from the original stream and processed with the map
function. This results in a partial publishing to Kafka.
How to solve this? Well, the hint is already above: DO NOT USE side-effects in a map
function.
In this case, the correct output operation to consume the stream and send it to Kafka should be foreachRDD
.
Furthermore, to avoid creating a Kafka producer instance for each element, we process the internal RDD
using foreachPartition
.
A code skeleton of this process looks like this:
messages.foreachRDD{rdd =>
rdd.foreachPartition{partitionIter =>
producer = // create producer
partitionIter.foreach{elem =>
record = createRecord(elem)
producer.send(record)
}
producer.flush()
producer.close()
}
}
Upvotes: 2