Reputation: 1428
I am using Debebezium CDC with the outbox transformer and we need it to start up as fast as possible. Since we are using the outbox, there is only ever going to be one table that we are interested in (and that table structure is unlikely to change).
When we use "snapshot.mode": "none" then any binlog changes we capture throws an error as we have not recorded the schema. When we use "snapshot.mode": "schema_only" all tables in the schema are written to "schema.history.internal.kafka.topic", despite limiting this by "table.include.list" - this only contains the one table that we are interested in.
There are many tables in the schema and adding them all to "table.exclude.list" feels impractical.
Is there a fault / something missing in our properties file? We only want to capture the schema for one table we are interested in.
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.include.list": "xxxxxxxxxx",
"database.hostname": "xxxxxxxxxxxx",
"database.port": "3306",
"database.user": "xxxxxx",
"database.password": "xxxxxxxxxxxxxx",
"database.server.id": "184054",
"table.include.list": "xxxxxx.outbox",
"snapshot.mode": "schema_only",
"snapshot.locking.mode": "none",
"schema.history.internal.kafka.bootstrap.servers": "xxxxxxxxxxxxxxxxxxxx",
"schema.history.internal.kafka.topic": "operations.integration.schema.history",
"include.schema.changes": "false",
"tombstones.on.delete": "false",
"topic.prefix": "operations.integration.feeds",
"poll.interval.ms": 100,
"skipped.operations": "u,d,t",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"producer.override.compression.type": "lz4",
"transforms": "outbox,routing,insertMessageSystemHeader,dropHeaders",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": "true",
"transforms.outbox.table.fields.additional.placement": "message_id:header:messaging.message.message.id,idempotency_key:header:xxxxxxx.message.idempotency_key,is_canary_request:header:xxxxxxx.message.canary,message_name:header:xxxxxxx.message.name,message_version:header:xxxxxxx.message.version,message_owner:header:xxxxxxx.message.owner,correlation_id:header:xxxxxxx.message.conversation_id,message_format:header:xxxxxxx.message.format",
"transforms.outbox.table.field.event.key": "event_id",
"transforms.outbox.table.field.event.timestamp": "date_updated",
"transforms.outbox.table.field.event.payload": "details",
"transforms.outbox.table.field.event.id": "message_id",
"transforms.outbox.tracing.span.context.field": "correlation_id",
"transforms.outbox.route.topic.replacement": "xxxxxxxxxxxxxxxxxxxx.${routedByValue}",
"transforms.outbox.route.by.field": "target",
"transforms.routing.type": "io.debezium.transforms.partitions.PartitionRouting",
"transforms.routing.partition.payload.fields": "id",
"transforms.routing.partition.topic.num": "10",
"transforms.insertMessageSystemHeader.type": "org.apache.kafka.connect.transforms.InsertHeader",
"transforms.insertMessageSystemHeader.header": "messaging.system",
"transforms.insertMessageSystemHeader.value.literal": "kafka",
"transforms.dropHeaders.type": "org.apache.kafka.connect.transforms.DropHeaders",
"transforms.dropHeaders.headers": "id"
}
Upvotes: 0
Views: 66