Kafka Consumer get key value pair

I'm currently working with Kafka and Flink, I have kafka running in my local PC and I created a topic that is being consumed.

Desktop\kafka\bin\windows>kafka-console-consumer.bat --zookeeper localhost:2181 -topic test

but it is just retrieving the message, enter image description here

is there a way to get futher details about the message ? lets say time? key? I checked kafka documentation but I didn't find something about this topic

Upvotes: 52

Views: 73019

Answers (3)

sunil maurya
sunil maurya

Reputation: 21

Below command to print key, value, partition, offset, message-published-timestamp

kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning \
-topic test-topic \
--property print.key=true \
--property print.value=true \ 
--property key.separator="-" \
--property print.timestamp=true \
--property print.offset=true \
--property print.partition=true \
--property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Upvotes: 0

ankit kansal
ankit kansal

Reputation: 239

Use below command:

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_name \ 
      --from-beginning --formatter kafka.tools.DefaultMessageFormatter \
      --property print.key=true --property print.value=true \
      --property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
      --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Upvotes: 19

Luciano Afranllie
Luciano Afranllie

Reputation: 4263

Using out of the box console consumer (I am using Kafka 0.9.0.1) you can only print the key and the value of messages using different formats. To print the key, set the property print.key=true.

There is another property key.separator that by default is "\t" (a tab) that you can also change to anything you want.

To set these properties you can create a config file and use --consumer.config <config file> or pass the properties using --property key=value.

You can also implement your own formatter and use it with --formatter option but you will still have just the key and value because that is what the MessageFormatter trait provides (see writeTo below).

trait MessageFormatter {
    def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)

    def init(props: Properties) {}

    def close() {}
}

For example:

./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server kafka-1:9092 --topic topic1 --property print.key=true --property key.separator="-" --from-beginning
key-p1
key-p2
key-p3
null-4
null-8
null-0

Upvotes: 101

Related Questions