steven
steven

Reputation: 683

Kafka on Spark only reads realtime ingestion

Spark version = 2.3.0

Kafka version = 1.0.0

Sinppet of code being used:

# Kafka Enpoints
zkQuorum = '192.168.2.10:2181,192.168.2.12:2181' 
topic = 'Test_topic'

# Create a kafka Stream
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "cyd-demo-azureactivity-streaming-consumer", {topic: 1})

When the Kafka stream is run real time, I see spark pulling data, however if I start Kafka say an hour before Spark, it will not pick up the hour old data.

Is this expected or is there a way to set something up in a configuration?

Code run using:

sudo $SPARK_HOME/spark-submit --master local[2] --jars /home/steven/jars/elasticsearch-hadoop-6.3.2.jar,/home/steven/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /home/steven/code/demo/test.py

Upvotes: 0

Views: 90

Answers (1)

Aman Juneja
Aman Juneja

Reputation: 151

If you always need to pull the data from the start then you need to set the kafka property "auto.offset.reset" to "earliest". This will pull the records from the start.

This param is Kafka consumer config - http://kafka.apache.org/documentation.html#newconsumerconfigs

Reference link - https://spark.apache.org/docs/2.3.0/streaming-kafka-0-10-integration.html

There are multiple implementations for createStream.. you can use the one in which you can pass the kafka configs. Sample code for create Stream -

val kafkaParams = Map(
"zookeeper.connect" -> "zookeeper1:2181",
"group.id" -> "spark-streaming-test",
"auto.offset.reset" -> "earliet"
)

val inputTopic = "input-topic"

val stream =  KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic-> 1), StorageLevel.MEMORY_ONLY_SER)

Upvotes: 3

Related Questions