Reputation: 1247
I am working on a design MySQL -> Debezium -> Kafka -> Flink -> Kafka -> Kafka Connect JDBC -> MySQL. Following is sample message i write from Flink (I also tried using Kafka console producer as well)
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
}
],
"optional": true,
"name": "user"
},
"payload": {
"id": 1,
"name": "Smith"
}
}
but connect failed on JsonConverter
DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
I have debugged and in method public SchemaAndValue toConnectData(String topic, byte[] value)
value is null. My sink configurations are:
{
"name": "user-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "user",
"connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "root",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
Can someone please help me on this issue?
Upvotes: 0
Views: 1105
Reputation: 6593
I think an issue is not related with the value serialization (of Kafka message). It is rather problem with the key of the message.
What is your key.converter
? I think it is the same like value.converter
(org.apache.kafka.connect.json.JsonConverter
). Your key might be simple String
, that doesn't contain schema
, payload
Try to change key.converter
to org.apache.kafka.connect.storage.StringConverter
For Kafka Connect you set default Converters
, but you can also set specific one for your particular Connector configuration (that will overwrite default one). For that you have to modify your config request:
{
"name": "user-sink",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "user",
"connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "root",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
Upvotes: 2