Reputation: 5356
I know there has to be a way to do this, but I am not able to figure this out. I need to stop the kafka consumer once I have read all the messages from the queue.
Can somebody provide any info on this?
Upvotes: 15
Views: 21720
Reputation: 146
Currently, Kafka version 2.11-2.1.1 has a script called kafka-console-consumer.sh
.
It has a new flag: --timeout-ms
.
Basically, this flag is the maximum time to wait before exiting when there is no new log to wait. It's in millisecond term.
You can use this property to end you console consumer after reading all the messages.
Upvotes: 10
Reputation: 733
You can use SimpleConsumerShell with no-wait-at-logend option. See SystemTools-SimpleConsumerShell
For example:
./kafka-run-class.bat kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic kafkademo --partition 0 --no-wait-at-logend
Upvotes: 5
Reputation: 6572
You can pass parameter: -consumer-timeout-ms with a value when starting the consumer and it will throw an exception if no messages have been read during that time. For example, to stop the consumer if no new messages have arrived in the last 2 seconds: kafka.consumer.ConsoleConsumer -consumer-timeout-ms 2000
You can see this and all the other input options here
Upvotes: 10
Reputation: 3113
If you are not dead set on using the Scala client, try kafkacat with the -e
option telling it to exit when end of partition has been reached.
E.g. to consume all messages from mytopic partition 2 and then exit:
$ kafkacat -b mybroker -t mytopic -p 2 -o beginning -e
Or consume the last 3000 messages and then exit:
$ kafkacat -b mybroker -t mytopic -p 2 -o -3000 -e
Upvotes: 2