Reputation: 2209
I am using debezium kafka connect to stream postgresql database CDC events to kafka cluster. Currently, I am on version 1.9.0 but want to upgrade to version 2.4.0
We are using Kafka outbox pattern to stream database records to Kafka queues.
I am able to upgrade to 2.4.0 but getting a strange warning
Unexpected update message received Struct{id=1} and ignored [io.debezium.transforms.outbox.EventRouterDelegate]
Below is my configuration
{
"value.converter.delegate.converter.type.schemas.enable": "false",
"database.password": "*****",
"database.user": "postgres",
"publication.name": "dbz_outbox_pub",
"slot.name": "debezium2",
"topic.creation.default.cleanup.policy": "compact",
"heartbeat.topic.prefix": "__debezium-heartbeat",
"tasks.max": "1",
"transforms": "outbox",
"topic.creation.default.partitions": "1",
"heartbeat.interval.ms": "60000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"transforms.outbox.table.field.event.key": "aggregateid",
"publication.autocreate.mode": "filtered",
"topic.creation.default.replication.factor": "1",
"heartbeat.action.query": "CREATE TABLE IF NOT EXISTS debezium_heartbeat (id SERIAL PRIMARY KEY, ts TIMESTAMP WITH TIME ZONE);\nINSERT INTO debezium_heartbeat (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts;",
"plugin.name": "pgoutput",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"table.include.list": "public.kafkaoutbox,public.debezium_heartbeat",
"topic.creation.default.compression.type": "uncompressed",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"database.dbname": "superipdev_cellb",
"value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter",
"database.port": "5432",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"tombstones.on.delete": "false",
"key.converter.delegate.converter.type.schemas.enable": "false",
"event.processing.failure.handling.mode": "warn",
"schema.include.list": "",
"snapshot.mode": "never",
"database.hostname": "host.docker.internal",
"topic.prefix": "cell_b",
"database.server.name": "cell_b",
"name": "debezium2"
}
has anyone come across this before? Any help would be highly appreciated.
Upvotes: 0
Views: 639
Reputation: 397
Seems that you are doing an update on the outbox table. The SMT EventRouterDelegate
on Debezium will ignore all operations except INSERT
as you are not supposed to update the records from the outbox table.
Code for this is here
Upvotes: 0