Reputation: 832
While consuming from the Kafka topic using Kafka console consumer or kt(GoLang CLI tool for Kafka), I am getting invalid characters.
...
\u0000\ufffd?\u0006app\u0000\u0000\u0000\u0000\u0000\u0000\u003e@\u0001
\u0000\u000cSec-39\u001aSome Actual Value Text\ufffd\ufffd\ufffd\ufffd\ufffd
\ufffd\u0015@\ufffd\ufffd\ufffd\ufffd\ufffd\ufff
...
Even though Kafka connect can actually sink the proper data to an SQL database.
Upvotes: 1
Views: 6801
Reputation: 32090
Given that you say
Kafka connect can actually sink the proper data to an SQL database.
my assumption would be that you're using Avro serialization for the data on the topic. Kafka Connect configured correctly will take the Avro data and deserialise it.
However, console tools such as kafka-console-consumer
, kt
, kafkacat
et al do not support Avro, and so you get a bunch of weird characters if you use them to read data from a topic that is Avro-encoded.
To read Avro data to the command line you can use kafka-avro-console-consumer
:
kafka-avro-console-consumer
--bootstrap-server kafka:29092\
--topic test_topic_avro \
--property schema.registry.url=http://schema-registry:8081
Edit: Adding a suggestion from @CodeGeas too:
Alternatively, reading data using REST Proxy can be done with the following:
# Create a consumer for JSON data
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
# Subscribe the consumer to a topic
http://kafka-rest-instance:8082/consumers/my_json_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"topics":["YOUR-TOPIC-NAME"]}' \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# Then consume some data from a topic using the base URL in the first response.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Later, to delete the consumer afterwards:
curl -X DELETE -H "Accept: application/vnd.kafka.avro.v2+json" \
http://kafka-rest-instance:8082/consumers/my_json_consumer/instances/my_consumer_instance
Upvotes: 2
Reputation: 26885
By default, the console consumer tools deserializes both the message key and value using ByteArrayDeserializer
but then obviously tries to print data to the command line using the default formatter.
This tool however allows to customize the deserializers and formatter used. See the following extract from the help output:
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.' and 'value.
deserializer.' prefixes to configure
their deserializers.
--key-deserializer <String:
deserializer for key>
--value-deserializer <String:
deserializer for values>
Using these settings, you should be able to change the output to be what you want.
Upvotes: 1