c0degeas
c0degeas

Reputation: 832

Shows invalid characters while consuming using kafka console consumer

While consuming from the Kafka topic using Kafka console consumer or kt(GoLang CLI tool for Kafka), I am getting invalid characters.

...
\u0000\ufffd?\u0006app\u0000\u0000\u0000\u0000\u0000\u0000\u003e@\u0001
\u0000\u000cSec-39\u001aSome Actual Value Text\ufffd\ufffd\ufffd\ufffd\ufffd
\ufffd\u0015@\ufffd\ufffd\ufffd\ufffd\ufffd\ufff
...

Even though Kafka connect can actually sink the proper data to an SQL database.

Upvotes: 1

Views: 6801

Answers (2)

Robin Moffatt
Robin Moffatt

Reputation: 32090

Given that you say

Kafka connect can actually sink the proper data to an SQL database.

my assumption would be that you're using Avro serialization for the data on the topic. Kafka Connect configured correctly will take the Avro data and deserialise it.

However, console tools such as kafka-console-consumer, kt, kafkacat et al do not support Avro, and so you get a bunch of weird characters if you use them to read data from a topic that is Avro-encoded.

To read Avro data to the command line you can use kafka-avro-console-consumer:

kafka-avro-console-consumer
         --bootstrap-server kafka:29092\
         --topic test_topic_avro \
         --property schema.registry.url=http://schema-registry:8081

Edit: Adding a suggestion from @CodeGeas too:

Alternatively, reading data using REST Proxy can be done with the following:

# Create a consumer for JSON data
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
         -H "Accept: application/vnd.kafka.v2+json" \
         --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \

# Subscribe the consumer to a topic
         http://kafka-rest-instance:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
         --data '{"topics":["YOUR-TOPIC-NAME"]}' \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

# Then consume some data from a topic using the base URL in the first response.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Later, to delete the consumer afterwards:

curl -X DELETE -H "Accept: application/vnd.kafka.avro.v2+json" \
         http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance

Upvotes: 2

Mickael Maison
Mickael Maison

Reputation: 26885

By default, the console consumer tools deserializes both the message key and value using ByteArrayDeserializer but then obviously tries to print data to the command line using the default formatter.

This tool however allows to customize the deserializers and formatter used. See the following extract from the help output:

--formatter <String: class>              The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--property <String: prop>                The properties to initialize the
                                           message formatter. Default
                                           properties include:
                                            print.timestamp=true|false
                                            print.key=true|false
                                            print.value=true|false
                                            key.separator=<key.separator>
                                            line.separator=<line.separator>
                                            key.deserializer=<key.deserializer>
                                            value.deserializer=<value.
                                           deserializer>
                                         Users can also pass in customized
                                           properties for their formatter; more
                                           specifically, users can pass in
                                           properties keyed with 'key.
                                           deserializer.' and 'value.
                                           deserializer.' prefixes to configure
                                           their deserializers.
--key-deserializer <String:
  deserializer for key>
--value-deserializer <String:
  deserializer for values>

Using these settings, you should be able to change the output to be what you want.

Upvotes: 1

Related Questions