TakeSoUp
TakeSoUp

Reputation: 8077

Kafka console consumer: How to get only the last N messages from a topic instead of everything from the beginning?

I can do this:

./bin/kafka-avro-console-consumer --zookeeper 10.0.0.225:2181/kafka
--topic myTopic --property schema.registry.url=http://10.0.0.100:8081 
--from-beginning

But I have too many messages. I would rather only get the last N ones. How can I do that with kafka console consumer?

Upvotes: 36

Views: 77935

Answers (6)

Basan
Basan

Reputation: 111

./bin/kafka-console-consumer.sh --bootstrap-server kafka.internal:9093 \
--max-messages 1 --group basan_PERF_console \
--consumer.config config-stage.properties \
--partition 1 --offset 14677669 \
--topic order-demandh

This is the way to retrieve one message at a time from specific offset

Upvotes: 10

Pardeep Sharma
Pardeep Sharma

Reputation: 582

First get the offset value in all partitions:

kafka-run-class.bat kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicName

topicName:0:15 topicName:1:16 topicName:2:10

Now get the N number of messages from the particular topic and partition. I used offset number as 10 because I want to read only 5 messages from partition-0.

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topicName --offset 10 --partition 0

It'll print last 5 messages from partition-0.

Remember Kafka does not guarantee ordering of messages between partitions. It does provide ordering within a partition.

Upvotes: 2

Michael Heil
Michael Heil

Reputation: 18505

You could also use the tool kafkacat which is documented for example here.

This is a very powerful and fast tool to read data out of Kafka from the console and is open source: https://github.com/edenhill/kafkacat.

Many exmples are provided on GitHub and one example is shown below to read the last n (here: 2000) messages out of the partition 0 of topic 'mytopic':

kafkacat -C -b mybroker -t mytopic -p 0 -o -2000

Upvotes: 3

Casey
Casey

Reputation: 87

I can confirm that the max-messages argument can be passed to both the kafka-console-consumer and kafka-avro-console-consumer cli tools.

Because I've had to look a lot of this stuff individually, the below example uses the kafka-avro-console-consumer to consume the first 10 Avro records, from an SSL enabled topic, with schema registry configured, and prints the String keys along with the records.

./kafka-avro-console-consumer \
--bootstrap-server something.com:9093,something2.com:9093 \
--consumer.config /path/to/client-ssl.properties \
--property schema.registry.url=http://some-schema-registry.com \
--property print.key=true \
--key-deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--from-beginning \
--max-messages 10 \
--topic some-topic-name

Upvotes: 0

apostl3pol
apostl3pol

Reputation: 984

I'm just getting started with Kafka, so there may be a better way to do this, but it prints the last 10 messages from the first partition returned by kafka-run-class kafka.tools.GetOffsetShell, then continues to output new messages as they come in.

offsets=$(kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list $BROKER_1:9092,$BROKER_2:9092 --topic my_topic)
kafka-avro-console-consumer --bootstrap-server $BROKER_1:9092 \
  --topic my_topic \
  --partition $(echo $offsets | awk -F':' '{print $2}') \
  --offset $(($(echo $offsets | awk -F':| ' '{print $3}') - 10)) \
  --property schema.registry.url=http://$WORKER_1:8081

Replace the 10 above with the desired number of messages to print.

Upvotes: 5

mlg
mlg

Reputation: 1511

If you want to stick with the bundled binaries, you need to use the simple consumer shell:

bin/kafka-simple-consumer-shell.sh --broker-list mybroker:9092 --topic
mytopic --partition mypartition --offset myoffset

I recommend kt; it's much faster, more lightweight and has better options.

Upvotes: 14

Related Questions