mlelievre
mlelievre

Reputation: 1

Sink from SQL Server to RabbitMQ won't use specified transform

I'm new to Debezium and have set up a Debezium Server using Docker. I managed to get Debezium to listen to my SQL Server and send message to a RabbitMQ queue.

I was trying to use a transformation by following the official documentation to implement the New Record State Extraction transformations. It should simplify the payload of the RabbitMQ message but Debezium still uses its default structure. I see no warning nor error in my Debezium container's logs.

Here's my application.properties:

# Sink connector config - RabbitMQ
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.ackTimeout=3000
debezium.sink.rabbitmq.exchange=ProductsX
debezium.sink.rabbitmq.routingKey=

# Source connector config - MSSQL
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.plugin.name=mssql-connector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=host.docker.internal
debezium.source.database.port=1433
debezium.source.database.user=debezium
debezium.source.database.password=debezium
debezium.source.database.names=ProductsDB
debezium.source.database.server.name=localhost\SQL2019
debezium.source.database.encrypt=false
debezium.source.database.trustServerCertificate=true
debezium.source.table.include.list=dbo.Product
debezium.source.topic.prefix=Products
debezium.source.schema.history.internal=io.debezium.relational.history.MemorySchemaHistory

# Transform config
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

# Format config
debezium.format.key=json
debezium.format.value=json

# Quarkus
quarkus.log.console.json=false

This configuration still outputs:

{
    "op": "u",
    "source": {
        ...
    },
    "ts_ms" : "...",
    "ts_us" : "...",
    "ts_ns" : "...",
    "before" : {
        "field1" : "oldvalue1",
        "field2" : "oldvalue2"
    },
    "after" : {
        "field1" : "newvalue1",
        "field2" : "newvalue2"
    }
}

Where I was expecting something more like:

{
    "field1" : "newvalue1",
    "field2" : "newvalue2"
}

Does anyone see where I could have made a mistake?

Thanks!

Upvotes: 0

Views: 77

Answers (0)

Related Questions