user8510613
user8510613

Reputation: 1282

Can Debezium MySQL Connector route data change event to different topic by event's operation type?

There are three types of data change event that Debezium support:

I've known that there is an op field in Debezium published message's payload that identify the event type, but i wonder that is there anyway that i can route these three types of data change event to different Kafka topic by the operation type, like SMT?

Upvotes: 0

Views: 510

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32100

Single Message Transform

As you suggest, Single Message Transform is a good option to use here. Debezium have a transform currently in beta called ContentBasedRouter with which you can code the routing using languages including Groovy.

ksqlDB

You can do this with ksqlDB:

-- Declare source topic from Debezium as ksqlDB stream
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='asgard.demo.ORDERS', VALUE_FORMAT='AVRO');

-- Create three streams (backed by Kafka topics) based on the op-type
CREATE STREAM ORDERS_UPDATES AS SELECT * FROM ORDERS WHERE OP='u';
CREATE STREAM ORDERS_DELETES AS SELECT * FROM ORDERS WHERE OP='d';
CREATE STREAM ORDERS_CREATES AS SELECT * FROM ORDERS WHERE OP='c';

View the data

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 ORDERS_CREATES                        | 1          | 1
 ORDERS_DELETES                        | 1          | 1
 ORDERS_UPDATES                        | 1          | 1

Check the counts

ksql> SELECT OP,COUNT(*) AS EVENTS FROM ORDERS GROUP BY OP EMIT CHANGES;
+-------+----------+
|OP     |EVENTS    |
+-------+----------+
|u      |3         |
|c      |502       |
|d      |5         |

ksql> SELECT 'ORDERS_UPDATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_UPDATES GROUP BY 'ORDERS_UPDATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_UPDATES  |3            |
Limit Reached
Query terminated

ksql> SELECT 'ORDERS_CREATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_CREATES GROUP BY 'ORDERS_CREATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_CREATES  |503          |
Limit Reached
Query terminated

ksql> SELECT 'ORDERS_DELETES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_DELETES GROUP BY 'ORDERS_DELETES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_DELETES  |5            |
Limit Reached
Query terminated

Upvotes: 2

Related Questions