Reputation: 683
Spark version = 2.3.0
Kafka version = 1.0.0
Sinppet of code being used:
# Kafka Enpoints
zkQuorum = '192.168.2.10:2181,192.168.2.12:2181'
topic = 'Test_topic'
# Create a kafka Stream
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "cyd-demo-azureactivity-streaming-consumer", {topic: 1})
When the Kafka stream is run real time, I see spark pulling data, however if I start Kafka say an hour before Spark, it will not pick up the hour old data.
Is this expected or is there a way to set something up in a configuration?
Code run using:
sudo $SPARK_HOME/spark-submit --master local[2] --jars /home/steven/jars/elasticsearch-hadoop-6.3.2.jar,/home/steven/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /home/steven/code/demo/test.py
Upvotes: 0
Views: 90
Reputation: 151
If you always need to pull the data from the start then you need to set the kafka property "auto.offset.reset
" to "earliest
". This will pull the records from the start.
This param is Kafka consumer config - http://kafka.apache.org/documentation.html#newconsumerconfigs
Reference link - https://spark.apache.org/docs/2.3.0/streaming-kafka-0-10-integration.html
There are multiple implementations for createStream.. you can use the one in which you can pass the kafka configs. Sample code for create Stream -
val kafkaParams = Map(
"zookeeper.connect" -> "zookeeper1:2181",
"group.id" -> "spark-streaming-test",
"auto.offset.reset" -> "earliet"
)
val inputTopic = "input-topic"
val stream = KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic-> 1), StorageLevel.MEMORY_ONLY_SER)
Upvotes: 3