Debezium RabbitMQ outbox event static routing key

I am trying to implement outbox pattern by reading inserts in a table from a MySQL and transforming them into events in RabbitMQ.

My Outbox table contains:

For that I am trying Debezium and its Outbox event router transformer.

For what I see on the docs (https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html#outbox-event-router-property-table-field-event-key) you can configure it to read from your outbox table telling it which column represents the KEY and the payload of the event (or delegate in the defaults, aggregateid and payload).

The event is being sent to the correct exchange, but the routing key that debezium is using is the routing key specified in the configuration property debezium.sink.rabbitmq.routingKey , it is not using debezium.transforms.outbox.table.field.event.key.

Looking at the source code of the transformer (https://github.com/debezium/debezium-server/blob/main/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumer.java#L138) it takes the routing key from said static configuration property, or, if you set debezium.sink.rabbitmq.routingKeyFromTopicName it uses the topic name (the exchange).

Is there a way I can send the event to an exchange with dynamic routing key, based on a column of the source??

This is the debezium config I am using:

# Sink connector config - RabbitMQ
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.connection.virtual.host=/
debezium.sink.rabbitmq.ackTimeout=3000
debezium.sink.rabbitmq.delivery.mode=2

# Source connector config - MySQL
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.database.hostname=mysql
debezium.source.database.dbname=producer
debezium.source.database.port=8779
debezium.source.database.user=root
debezium.source.database.password=root
debezium.source.table.include.list=producer.outbox
debezium.source.database.server.id=184054
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=load.test
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=data/schistory.dat
debezium.source.snapshot.mode=initial

# Read schema changes only in debezium.source.database.dbname
schema.history.internal.store.only.captured.databases.ddl=true

# Read and store schema changes on all non-system tables in database
schema.history.internal.store.only.captured.tables.ddl=false

# Format config
debezium.format.key=json
debezium.format.value=json

# Transformations
debezium.transforms=filter,outbox

## FILTER CONFIG

# Filter events that are going to be sent to the sink
debezium.transforms.filter.type=io.debezium.transforms.Filter

# Using Groovy as the langugage for defining the filter
debezium.transforms.filter.language=jsr223.groovy

# We only want c (create) events (INSERT)
debezium.transforms.filter.condition=value.schema().field("op") && value.getString("op") == "c"

## OUTBOX CONFIG

# Outbox transformation
debezium.transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

# Outbox table column to get the destination exchange. Overrides "aggregatetype"
debezium.transforms.outbox.route.by.field=exchange

# Route to just the value of debezium.transforms.outbox.route.by.field without any prefix
debezium.transforms.outbox.route.topic.regex=(?<routedByValue>.*)
debezium.transforms.outbox.route.topic.replacement=$${routedByValue}

# Table unique identifier
debezium.transforms.outbox.table.field.event.id=uuid

# Event key (routing key for RabbitMQ)
debezium.transforms.outbox.table.field.event.key=routing_key

# Event timestamp
debezium.transforms.outbox.table.field.event.timestamp=produced_timestamp

# Event payload
debezium.transforms.outbox.table.field.event.payload=payload

# Expand json from this: [{\"id\": 1   to  [{"id": 1
debezium.transforms.outbox.table.expand.json.payload=true

# Ignore null values during expansion
debezium.transforms.outbox.table.json.payload.null.behavior=ignore

# Aditional properties to be sent in the event: event_type, produced_timestamp (as timestamp)
debezium.transforms.outbox.table.fields.additional.placement=event_type:header,produced_timestamp:header:timestamp

# Error if there is no event_type
debezium.transforms.outbox.table.fields.additional.error.on.missing=true

# Null payload does not emi tombstone event
debezium.transforms.outbox.route.tombstone.on.empty.payload=false

# If there is an UPDATE to outbox table, logs error and continues consuming
debezium.transforms.outbox.table.op.invalid.behavior=error


# Quarkus configuration
quarkus.log.level=INFO
quarkus.log.console.json=false

Thanks!

Upvotes: 0

Views: 20

Answers (0)

Related Questions