yoga
yoga

Reputation: 1959

Recover Lost message in KAFKA using OFFSET

I have been asked this question in an interview.

Imagine a packet was lost because of a failure(Not sure its consumer failure or broker). What should be done(code implementation) to recover the lost messages during this time using offset?

i am sorry my question might not be clear as it was asked similar way.

thanks

Upvotes: 1

Views: 2450

Answers (3)

yoga
yoga

Reputation: 1959

After Reading lot of articles and documentation i felt the Best Answer Might Be :

Using New Spark Kafka Consumer with no receivers(spark-streaming-kafka-0-10_2.11). In this approach we can give startOffset from where we want to read.

val offsetRanges = Array( // topic, partition, inclusive starting offset, exclusive ending offset OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100) )

val rdd = KafkaUtils.createDirectStream[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

Once your messages were read and processed, get the offsets you read and store them in Kafka or Zk or External transactional Database.

offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

Every time We are Starting Job, fetch the Offsets from Database and pass it to createDirectStream to have exacly once mechanism.

More reading http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

Upvotes: 0

jimijazz
jimijazz

Reputation: 2267

If you know the offset of the message you want to recover, and which partition it belonged to you can use KafkaConsumer method seek:

consumer.seek(new TopicPartition("topic-name", partNumber), offsetNumber);

as detailed here

the next call to poll() would give you the message you missed first in the list.

This would only work in a scenario where you are managing your offsets yourself in the first place. In case you are letting Kafka manage the offsets, you probably don't know the offset number and the best you will probably end up with messages consumed twice (a call to poll() will begin consuming from the last committed offset).

Upvotes: 1

Shankar
Shankar

Reputation: 8967

Kafka follows the at-least once message delivery semantics, it means you might get duplicate at the time of broker failure, you will not lose the data.

But when you create Kafka Producer if you have this property as 0, then it will try to send only once, even in the case of broker failure also it will not try to resend. So you might lose data if the broker fails.

props.put("retries", 0);

So you can change this property value to 1, so it will try to send again, also offsets are managed in Zookeeper automatically, if the message is delivered sucessfully only , it will update the offsets in Zookeeper.

Also, since you mentioned SPark Streaming to consume, SPark Streaming supports two different approaches.

1. Receiver based: Offsets are handled in Zookeeper.

2. Direct Approach: Offsets are handled locally where the messages are stored, also this approach supports Exactly-once message delivery.

For more info check this link

Upvotes: 0

Related Questions