Reputation: 529
I'm trying to write JSON with Kafka HDFS Sink.
I have the following properties (connect-standalone.properties):
key.converter.schemas.enable = false
value.converter.schemas.enable = false
schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
And on my properties:
format.class=io.confluent.connect.hdfs.json.JsonFormat
And I got the following exception:
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka connect failed due to serilization error
... Caused By: org.apache.kafka.commom.errors.SerlizationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'test' : was expecting 'null', 'true', 'false' or NaN at [Source: (byte[])"test" line: 1 column: 11]
My JSON is valid.
How can I solve it please?
*I'm trying also with sample JSON like:
{"key":"value"}
And still same error.
Thanks.
Upvotes: 0
Views: 918
Reputation: 191973
According to the error, not all the messages in the topic are actually JSON objects. The latest messages might be valid, or the Kafka values might be valid (not the keys, though), but the error shows that it tried to read a plain string, (byte[])"test"
, which is not valid
If you only want text data into HDFS, you could use String format, however that won't have Hive integration
format.class=io.confluent.connect.hdfs.string.StringFormat
If you did want to use Hive with this format, you would need to define the JSON Serde yourself
Upvotes: 1