Reputation: 21
i tired following the link
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
to use SimpleConsumer to consume messages but while using it I found some abrupt behavior as follows:
The consumers are consuming messages from a specific partition. But the issue is that When my consumer is running and I push messages to the topic using a producer it consumes messages from that partition. But if my consumer is not running at present and I push some messages to the topic and again start the consumer it do not consume the messages which were pushed by the producer but again it is ready to consume messages which will be pushed now. I am using LatestTime() instead od EarliestTime() as I want to consume only unprocessed messages.
For example
Case -1
Consumer is running :
Producer pushed M1, M2, M3 message to partition 1 of topic 1
result: consumer will consume all three messages.
Case - 2
Consumer is not running
producer now pushes m4, m5 m6 messgae to partition 1 of topic 1
consumer is invoked now
result : consumer donot consume messgaes m4, m5, m6 but If I will check the offset then it is set to 7. This means the producer has advanced the offset to 7 while producing messages as a result the consumer will consume messages from offset 7 now
Please help ideally when when consumer comes up again it should read messages from m4.
Upvotes: 1
Views: 4242
Reputation: 23881
You're doing it wrong.
First of all I'm not sure SimpleConsumer
is what you're looking for. It forces you to manage offsets yourself (e.g. it does not commit offsets to Zookeeper at all and every time you start a SimpleConsumer
again it will fetch the same messages again). SimpleConsumer
does not have an understanding of a "processed message". All it can do is start fetching from some offset and continue fetching until you say "stop".
Anyway if you intend to commit processed offsets yourself you should use EarliestTime
(auto.offset.reset=smallest
config entry). auto.offset.reset
means that if your consumer is initialized with wrong offset (and SimpleConsumer
is initialized with -1
offset if I remember correctly, which is obviously wrong) it will reset to either smallest
available (EarliestTime
) or largest
available (LatestTime
) offset.
To make it clearer here's the example:
your Case-1
:
you create a consumer and point it to topic 1 partition 1. As it is initialized with wrong offset initially, it will ask the broker for some proper offset (here is where the smallest
or largest
offset reset comes in). In case when you did not yet produce any message the smallest
and largest
offsets will both be 0
, so when you produce some messages your consumer will fetch these messages.
Case-2
:
you produce N messages (say 7). Then you start your SimpleConsumer
. Again, it is initialized with wrong offset and asks the broker for proper offset. With smallest
reset offset it will be 0
, and with largest
offset it will be 7
. As in your example you use LargestOffsets
your consumer will re-initialize with offset 7
and start consuming from it.
In general take a look at high-level consumer, in most cases that's what you are looking for. Here's the link
Upvotes: 1