AE93
AE93

Reputation: 197

Kafka connect JDBC error despite giving schema and payload

I am getting the following error when I run kafka JDBC connector to PSQL:

JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

However my topic contains the following message structure with a schema added just like its presented online:

rowtime: 2022/02/04 12:45:48.520 Z, key: , value: "{"schema": {"type": "struct", "fields": [{"type": "int", "field": "ID", "optional": false}, {"type": "date", "field": "Date", "optional": false}, {"type": "varchar", "field": "ICD", "optional": false}, {"type": "int", "field": "CPT", "optional": false}, {"type": "double", "field": "Cost", "optional": false}], "optional": false, "name": "test"}, "payload": {"ID": "24427934", "Date": "2019-05-22", "ICD": "883.436", "CPT": "60502", "cost": "1374.36"}}", partition: 0

My configuration for the connector is:

 curl -X PUT http://localhost:8083/connectors/claim_test/config \
    -H "Content-Type: application/json" \
    -d '{
     "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
     "connection.url":"jdbc:postgresql://localhost:5432/ae2772",
     "key.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter.schemas.enable":"true",
     "topics":"test_7",
     "auto.create":"true",
     "insert.mode":"insert"
    }'

After some changes, I now get the following message:

WorkerSinkTask{id=claim_test} Error converting message value in topic 'test_9' partition 0 at offset 0 and timestamp 1644005137197: Unknown schema type: int

Upvotes: 1

Views: 1110

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191894

int is not a valid schema type. Should be int8, int16, int32, or int64.

Similarly, date, varchar and double are not valid either.

The types used in the JSON are not the same as Postgres or any SQL-specific types (a date should be converted to a Unix Epoch int64 time or be made a string).

You can find the supported schema types here: https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java

Upvotes: 1

Related Questions