Vektor88
Vektor88

Reputation: 4920

Kafka Connect and subject name strategies for Key Converter

I am trying to setup a Debezium MySQL Source connector. My goal is to have one topic for each database, so I am investigating the possibility to leverage subjects, in such a way that a topic can contain different message types and their schema can be stored in the Confluent Schema Registry.

Following several answers here, I have set the key and value converter subject name strategy to io.confluent.kafka.serializers.subject.TopicRecordNameStrategy.

To reroute all the messages coming from the same schema to the same topic, I am using the following configuration:

{
  "name": "aws-db-connector",
  "config": {
    "group.id": "aws-db-group",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "secret-pw",
    "database.server.id": "184054",
    "database.server.name": "aws-db",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.aws-db",
    "database.include.list": "db1,db2",
    "transforms": "unwrap,Reroute",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "db,table,op,source.ts_ms",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
    "transforms.Reroute.topic.replacement": "$2_schema",
    "transforms.Reroute.key.field.name": "table",
    "transforms.Reroute.key.field.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
    "transforms.Reroute.key.field.replacement": "$3"
  }
}

In my docker-compose file I have set:

- CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081

For Values, this is working flawlessly. I can see that my schema registry contains multiple subjects with the format <TopicName>-<RecordName>-Value, where TopicName is the name of the topic I'm rerouting this data to. RecordName is the "old" topic name created by Debezium in the format server_name.database_name.table_name.

For Keys unfortunately, this strategy is not working as expected, and I only have one schema subject: it looks like RecordName contains the new topic name instead of the original one. This leads to collisions and incompatibility errors if one field name has different types in different tables.

Is there any way to provide the proper RecordName when the key subject is generated?

EDIT - adding example:

Let's suppose my database contains three tables, table1, table2 and table3.

Table1:

CREATE TABLE `table1` (
    `id` INT NOT NULL AUTO_INCREMENT,
    `name` TEXT,
    PRIMARY KEY (`id`)
);

Table2:

CREATE TABLE `table2` (
    `id` INT NOT NULL AUTO_INCREMENT,
    `name` BINARY,
    PRIMARY KEY (`id`)
);

Table3:

CREATE TABLE `table3` (
    `id` BINARY NOT NULL,
    `name` INT,
    PRIMARY KEY (`id`)
);

Running Debezium with the above configurations, it creates the following Value subjects inside the schema registry:

And the following Key subject:

When it's the turn of table3, the Debezium connector fails, because the id column is registered in the schema registry subject with type int and it is incompatible with the bytes type in table3. Therefore I get this error:

Schema being registered is incompatible with an earlier schema; error code: 409

What I would expect, would be the creation of separate subjects for the key as well:

In such a way that messages with different key schemas could be stored in the same topic.

Upvotes: 2

Views: 1694

Answers (1)

Vektor88
Vektor88

Reputation: 4920

This appears to be how Debezium works by default. It will create only one key schema per topic, but different value schemas, so all the messages rerouted to the topic should share the same key structure.

To solve the issue, RegexRouter should be used instead. Applying a InsertField transformation before rerouting, will also add the original topic name to the key, and it will be possible to extract the table name from it.

"transforms": "InsertField,Reroute",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.InsertField.topic.field": "table"
"transforms.Reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Reroute.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.replacement": "$2_schema",

Upvotes: 1

Related Questions