divinedragon
divinedragon

Reputation: 5356

Terminate Kafka Console Consumer when all the messages have been read

I know there has to be a way to do this, but I am not able to figure this out. I need to stop the kafka consumer once I have read all the messages from the queue.

Can somebody provide any info on this?

Upvotes: 15

Views: 21720

Answers (4)

Planck35
Planck35

Reputation: 146

Currently, Kafka version 2.11-2.1.1 has a script called kafka-console-consumer.sh.

It has a new flag: --timeout-ms.

Basically, this flag is the maximum time to wait before exiting when there is no new log to wait. It's in millisecond term.

You can use this property to end you console consumer after reading all the messages.

Upvotes: 10

Ujjwal Wadhawan
Ujjwal Wadhawan

Reputation: 733

You can use SimpleConsumerShell with no-wait-at-logend option. See SystemTools-SimpleConsumerShell

For example:

./kafka-run-class.bat kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic kafkademo --partition 0 --no-wait-at-logend

Upvotes: 5

Lundahl
Lundahl

Reputation: 6572

You can pass parameter: -consumer-timeout-ms with a value when starting the consumer and it will throw an exception if no messages have been read during that time. For example, to stop the consumer if no new messages have arrived in the last 2 seconds: kafka.consumer.ConsoleConsumer -consumer-timeout-ms 2000

You can see this and all the other input options here

Upvotes: 10

Edenhill
Edenhill

Reputation: 3113

If you are not dead set on using the Scala client, try kafkacat with the -e option telling it to exit when end of partition has been reached.

E.g. to consume all messages from mytopic partition 2 and then exit:

$ kafkacat -b mybroker -t mytopic -p 2 -o beginning -e

Or consume the last 3000 messages and then exit:

$ kafkacat -b mybroker -t mytopic -p 2 -o -3000 -e

Upvotes: 2

Related Questions