Clover
Clover

Reputation: 559

kafka connect - How to filter schema metadata from payload

I'm trying to remove schema from the payload and here are the configurations

connector.properties

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/employee_db?user=root&password=root
table.whitelist=testemp
mode=incrementing
incrementing.column.name=employee_id
topic.prefix=test-mysql-jdbc-

and below are my worker.properties

bootstrap.servers=localhost:9092


key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false


internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000
plugin.path=C:\Users\name\Desktop\kafka\libs

output:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"employee_id"},{"type":"string","optional":false,"field":"first_name"}],"optional":false,"name":"testemp"},"payload":{"employee_id":2,"first_name":"test"}}

excepted output:

{"payload":{"employee_id":2,"first_name":"test"}}

I tried disabling value.converter.schemas.enable= false in worker as suggested in here still no effect

Am i missing something?

Upvotes: 1

Views: 2972

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6593

There are two option to fix it:

  • Remove value.converter property from your connector configuration (You use same value.converter)
  • Set value.converter.schemas.enable=false in your connector configuration.

Schema is added to message, because you have overwritten value converter and didn't disable schema (by default for JsonConverter schema is enabled). From Kafka Connect point of view you used completely new Converter (it will not use properties from global configuration)

If you will disable schema your message will be as follow:

{
    "employee_id": 2,
    "first_name":"test"
}

Upvotes: 3

Related Questions