Reputation: 1282
There are three types of data change event that Debezium support:
I've known that there is an op
field in Debezium published message's payload that identify the event type, but i wonder that is there anyway that i can route these three types of data change event to different Kafka topic by the operation type, like SMT?
Upvotes: 0
Views: 510
Reputation: 32100
As you suggest, Single Message Transform is a good option to use here. Debezium have a transform currently in beta called ContentBasedRouter
with which you can code the routing using languages including Groovy.
You can do this with ksqlDB:
-- Declare source topic from Debezium as ksqlDB stream
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='asgard.demo.ORDERS', VALUE_FORMAT='AVRO');
-- Create three streams (backed by Kafka topics) based on the op-type
CREATE STREAM ORDERS_UPDATES AS SELECT * FROM ORDERS WHERE OP='u';
CREATE STREAM ORDERS_DELETES AS SELECT * FROM ORDERS WHERE OP='d';
CREATE STREAM ORDERS_CREATES AS SELECT * FROM ORDERS WHERE OP='c';
View the data
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------------
ORDERS_CREATES | 1 | 1
ORDERS_DELETES | 1 | 1
ORDERS_UPDATES | 1 | 1
Check the counts
ksql> SELECT OP,COUNT(*) AS EVENTS FROM ORDERS GROUP BY OP EMIT CHANGES;
+-------+----------+
|OP |EVENTS |
+-------+----------+
|u |3 |
|c |502 |
|d |5 |
ksql> SELECT 'ORDERS_UPDATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT
FROM ORDERS_UPDATES GROUP BY 'ORDERS_UPDATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME |EVENT_COUNT |
+----------------+-------------+
|ORDERS_UPDATES |3 |
Limit Reached
Query terminated
ksql> SELECT 'ORDERS_CREATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT
FROM ORDERS_CREATES GROUP BY 'ORDERS_CREATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME |EVENT_COUNT |
+----------------+-------------+
|ORDERS_CREATES |503 |
Limit Reached
Query terminated
ksql> SELECT 'ORDERS_DELETES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT
FROM ORDERS_DELETES GROUP BY 'ORDERS_DELETES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME |EVENT_COUNT |
+----------------+-------------+
|ORDERS_DELETES |5 |
Limit Reached
Query terminated
Upvotes: 2