Reputation: 4920
I am trying to setup a Debezium MySQL Source connector. My goal is to have one topic for each database, so I am investigating the possibility to leverage subjects, in such a way that a topic can contain different message types and their schema can be stored in the Confluent Schema Registry.
Following several answers here, I have set the key and value converter subject name strategy to io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
.
To reroute all the messages coming from the same schema to the same topic, I am using the following configuration:
{
"name": "aws-db-connector",
"config": {
"group.id": "aws-db-group",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "secret-pw",
"database.server.id": "184054",
"database.server.name": "aws-db",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.aws-db",
"database.include.list": "db1,db2",
"transforms": "unwrap,Reroute",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "db,table,op,source.ts_ms",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.topic.replacement": "$2_schema",
"transforms.Reroute.key.field.name": "table",
"transforms.Reroute.key.field.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.key.field.replacement": "$3"
}
}
In my docker-compose
file I have set:
- CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
For Values, this is working flawlessly.
I can see that my schema registry contains multiple subjects with the format <TopicName>-<RecordName>-Value
, where TopicName
is the name of the topic I'm rerouting this data to. RecordName
is the "old" topic name created by Debezium in the format server_name.database_name.table_name
.
For Keys unfortunately, this strategy is not working as expected, and I only have one schema subject: it looks like RecordName
contains the new topic name instead of the original one.
This leads to collisions and incompatibility errors if one field name has different types in different tables.
Is there any way to provide the proper RecordName
when the key subject is generated?
EDIT - adding example:
Let's suppose my database contains three tables, table1
, table2
and table3
.
Table1:
CREATE TABLE `table1` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` TEXT,
PRIMARY KEY (`id`)
);
Table2:
CREATE TABLE `table2` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` BINARY,
PRIMARY KEY (`id`)
);
Table3:
CREATE TABLE `table3` (
`id` BINARY NOT NULL,
`name` INT,
PRIMARY KEY (`id`)
);
Running Debezium with the above configurations, it creates the following Value subjects inside the schema registry:
And the following Key subject:
When it's the turn of table3, the Debezium connector fails, because the id
column is registered in the schema registry subject with type int
and it is incompatible with the bytes
type in table3.
Therefore I get this error:
Schema being registered is incompatible with an earlier schema; error code: 409
What I would expect, would be the creation of separate subjects for the key as well:
In such a way that messages with different key schemas could be stored in the same topic.
Upvotes: 2
Views: 1694
Reputation: 4920
This appears to be how Debezium works by default. It will create only one key schema per topic, but different value schemas, so all the messages rerouted to the topic should share the same key structure.
To solve the issue, RegexRouter
should be used instead. Applying a InsertField
transformation before rerouting, will also add the original topic name to the key, and it will be possible to extract the table name from it.
"transforms": "InsertField,Reroute",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.InsertField.topic.field": "table"
"transforms.Reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Reroute.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.replacement": "$2_schema",
Upvotes: 1