Reputation: 1
I'm new to Debezium and have set up a Debezium Server using Docker. I managed to get Debezium to listen to my SQL Server and send message to a RabbitMQ queue.
I was trying to use a transformation by following the official documentation to implement the New Record State Extraction transformations. It should simplify the payload of the RabbitMQ message but Debezium still uses its default structure. I see no warning nor error in my Debezium container's logs.
Here's my application.properties:
# Sink connector config - RabbitMQ
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.ackTimeout=3000
debezium.sink.rabbitmq.exchange=ProductsX
debezium.sink.rabbitmq.routingKey=
# Source connector config - MSSQL
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.plugin.name=mssql-connector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=host.docker.internal
debezium.source.database.port=1433
debezium.source.database.user=debezium
debezium.source.database.password=debezium
debezium.source.database.names=ProductsDB
debezium.source.database.server.name=localhost\SQL2019
debezium.source.database.encrypt=false
debezium.source.database.trustServerCertificate=true
debezium.source.table.include.list=dbo.Product
debezium.source.topic.prefix=Products
debezium.source.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory
# Transform config
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
# Format config
debezium.format.key=json
debezium.format.value=json
# Quarkus
quarkus.log.console.json=false
This configuration still outputs:
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"ts_us" : "...",
"ts_ns" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
Where I was expecting something more like:
{
"field1" : "newvalue1",
"field2" : "newvalue2"
}
Does anyone see where I could have made a mistake?
Thanks!
Upvotes: 0
Views: 77