Zeeshan Bilal
Zeeshan Bilal

Reputation: 1247

Kafka Connect JDBC failed on JsonConverter

I am working on a design MySQL -> Debezium -> Kafka -> Flink -> Kafka -> Kafka Connect JDBC -> MySQL. Following is sample message i write from Flink (I also tried using Kafka console producer as well)

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      }
    ],
    "optional": true,
    "name": "user"
  },
  "payload": {
    "id": 1,
    "name": "Smith"
  }
}

but connect failed on JsonConverter

DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)

I have debugged and in method public SchemaAndValue toConnectData(String topic, byte[] value) value is null. My sink configurations are:

{
    "name": "user-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "user",
        "connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
        "connection.user": "root",
        "connection.password": "root",        
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

Can someone please help me on this issue?

Upvotes: 0

Views: 1105

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6593

I think an issue is not related with the value serialization (of Kafka message). It is rather problem with the key of the message.

What is your key.converter? I think it is the same like value.converter (org.apache.kafka.connect.json.JsonConverter). Your key might be simple String, that doesn't contain schema, payload

Try to change key.converter to org.apache.kafka.connect.storage.StringConverter

For Kafka Connect you set default Converters, but you can also set specific one for your particular Connector configuration (that will overwrite default one). For that you have to modify your config request:

{
    "name": "user-sink",
    "config": {
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "user",
        "connection.url": "jdbc:mysql://localhost:3306/my_db?verifyServerCertificate=false",
        "connection.user": "root",
        "connection.password": "root",        
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

Upvotes: 2

Related Questions