ajay mittal
ajay mittal

Reputation: 21

Read unprocessed messages in Apache Kafka using Simple Consumer

i tired following the link

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

to use SimpleConsumer to consume messages but while using it I found some abrupt behavior as follows:

The consumers are consuming messages from a specific partition. But the issue is that When my consumer is running and I push messages to the topic using a producer it consumes messages from that partition. But if my consumer is not running at present and I push some messages to the topic and again start the consumer it do not consume the messages which were pushed by the producer but again it is ready to consume messages which will be pushed now. I am using LatestTime() instead od EarliestTime() as I want to consume only unprocessed messages.

For example

Case -1

Consumer is running :

Producer pushed M1, M2, M3 message to partition 1 of topic 1

result: consumer will consume all three messages.

Case - 2

Consumer is not running

producer now pushes m4, m5 m6 messgae to partition 1 of topic 1

consumer is invoked now

result : consumer donot consume messgaes m4, m5, m6 but If I will check the offset then it is set to 7. This means the producer has advanced the offset to 7 while producing messages as a result the consumer will consume messages from offset 7 now

Please help ideally when when consumer comes up again it should read messages from m4.

Upvotes: 1

Views: 4242

Answers (1)

serejja
serejja

Reputation: 23881

You're doing it wrong.

First of all I'm not sure SimpleConsumer is what you're looking for. It forces you to manage offsets yourself (e.g. it does not commit offsets to Zookeeper at all and every time you start a SimpleConsumer again it will fetch the same messages again). SimpleConsumer does not have an understanding of a "processed message". All it can do is start fetching from some offset and continue fetching until you say "stop".

Anyway if you intend to commit processed offsets yourself you should use EarliestTime (auto.offset.reset=smallest config entry). auto.offset.reset means that if your consumer is initialized with wrong offset (and SimpleConsumer is initialized with -1 offset if I remember correctly, which is obviously wrong) it will reset to either smallest available (EarliestTime) or largest available (LatestTime) offset.

To make it clearer here's the example:

your Case-1:

you create a consumer and point it to topic 1 partition 1. As it is initialized with wrong offset initially, it will ask the broker for some proper offset (here is where the smallest or largest offset reset comes in). In case when you did not yet produce any message the smallest and largest offsets will both be 0, so when you produce some messages your consumer will fetch these messages.

Case-2:

you produce N messages (say 7). Then you start your SimpleConsumer. Again, it is initialized with wrong offset and asks the broker for proper offset. With smallest reset offset it will be 0, and with largest offset it will be 7. As in your example you use LargestOffsets your consumer will re-initialize with offset 7 and start consuming from it.

In general take a look at high-level consumer, in most cases that's what you are looking for. Here's the link

Upvotes: 1

Related Questions