GrahamB
GrahamB

Reputation: 1428

Debezium mysql connector is capturing schema changes for tables that are not in the table include list

I am using Debebezium CDC with the outbox transformer and we need it to start up as fast as possible. Since we are using the outbox, there is only ever going to be one table that we are interested in (and that table structure is unlikely to change).

When we use "snapshot.mode": "none" then any binlog changes we capture throws an error as we have not recorded the schema. When we use "snapshot.mode": "schema_only" all tables in the schema are written to "schema.history.internal.kafka.topic", despite limiting this by "table.include.list" - this only contains the one table that we are interested in.

There are many tables in the schema and adding them all to "table.exclude.list" feels impractical.

Is there a fault / something missing in our properties file? We only want to capture the schema for one table we are interested in.

{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.include.list": "xxxxxxxxxx",
    "database.hostname": "xxxxxxxxxxxx",
    "database.port": "3306",
    "database.user": "xxxxxx",
    "database.password": "xxxxxxxxxxxxxx",
    "database.server.id": "184054",
    "table.include.list": "xxxxxx.outbox",
    "snapshot.mode": "schema_only",
    "snapshot.locking.mode": "none",
    "schema.history.internal.kafka.bootstrap.servers": "xxxxxxxxxxxxxxxxxxxx",
    "schema.history.internal.kafka.topic": "operations.integration.schema.history",
    "include.schema.changes": "false",
    "tombstones.on.delete": "false",
    "topic.prefix": "operations.integration.feeds",
    "poll.interval.ms": 100,
    "skipped.operations": "u,d,t",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "producer.override.compression.type": "lz4",
    "transforms": "outbox,routing,insertMessageSystemHeader,dropHeaders",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.expand.json.payload": "true",
    "transforms.outbox.table.fields.additional.placement": "message_id:header:messaging.message.message.id,idempotency_key:header:xxxxxxx.message.idempotency_key,is_canary_request:header:xxxxxxx.message.canary,message_name:header:xxxxxxx.message.name,message_version:header:xxxxxxx.message.version,message_owner:header:xxxxxxx.message.owner,correlation_id:header:xxxxxxx.message.conversation_id,message_format:header:xxxxxxx.message.format",
    "transforms.outbox.table.field.event.key": "event_id",
    "transforms.outbox.table.field.event.timestamp": "date_updated",
    "transforms.outbox.table.field.event.payload": "details",
    "transforms.outbox.table.field.event.id": "message_id",
    "transforms.outbox.tracing.span.context.field": "correlation_id",
    "transforms.outbox.route.topic.replacement": "xxxxxxxxxxxxxxxxxxxx.${routedByValue}",
    "transforms.outbox.route.by.field": "target",
    "transforms.routing.type": "io.debezium.transforms.partitions.PartitionRouting",
    "transforms.routing.partition.payload.fields": "id",
    "transforms.routing.partition.topic.num": "10",
    "transforms.insertMessageSystemHeader.type": "org.apache.kafka.connect.transforms.InsertHeader",
    "transforms.insertMessageSystemHeader.header": "messaging.system",
    "transforms.insertMessageSystemHeader.value.literal": "kafka",
    "transforms.dropHeaders.type": "org.apache.kafka.connect.transforms.DropHeaders",
    "transforms.dropHeaders.headers": "id"
}

Upvotes: 0

Views: 66

Answers (0)

Related Questions